KAFKA-5636: Add Sliding Windows documentation (#9264)

Add necessary documentation for KIP-450, adding sliding window aggregations to KStreams

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
leah 2020-09-10 23:55:11 -05:00 committed by GitHub
parent 7d0086e0c3
commit 8260d7cdfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 6 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

View File

@ -1103,6 +1103,14 @@
<span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;time-windowed-aggregated-stream-store&quot;</span><span class="o">)</span> <span class="cm">/* state store name */</span>
<span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span> <span class="cm">/* serde for aggregate value */</span>
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute sliding windows and 30-minute grace period)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">SlidingWindows</span><span class="o">.</span><span class="na">withTimeDifferenceAndGrace</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">),</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">30</span><span class="o">)))</span>
<span class="o">.</span><span class="na">aggregate</span><span class="o">(</span>
<span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span><span class="n">L</span><span class="o">,</span> <span class="cm">/* initializer */</span>
<span class="o">(</span><span class="n">aggKey</span><span class="o">,</span> <span class="n">newValue</span><span class="o">,</span> <span class="n">aggValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span><span class="o">,</span> <span class="cm">/* adder */</span>
<span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;time-windowed-aggregated-stream-store&quot;</span><span class="o">)</span> <span class="cm">/* state store name */</span>
<span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()));</span> <span class="cm">/* serde for aggregate value */</span>
<span class="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">sessionizedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)).</span>
<span class="n">aggregate</span><span class="o">(</span>
@ -1221,6 +1229,11 @@
<span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* time-based window */</span>
<span class="o">.</span><span class="na">count</span><span class="o">();</span>
<span class="c1">// Counting a KGroupedStream with time-based windowing (here: with 5-minute sliding windows and 30-minute grace period)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">aggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
<span class="n">SlidingWindows</span><span class="o">.</span><span class="na">withTimeDifferenceAndGrace</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">),</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">30</span><span class="o">)))</span> <span class="cm">/* time-based window */</span>
<span class="o">.</span><span class="na">count</span><span class="o">();</span>
<span class="c1">// Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">aggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* session window */</span>
@ -1343,6 +1356,13 @@
<span class="o">(</span><span class="n">aggValue</span><span class="o">,</span> <span class="n">newValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span> <span class="cm">/* adder */</span>
<span class="o">);</span>
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute sliding windows and 30-minute grace)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
<span class="n">SlidingWindows</span><span class="o">.</span><span class="na">withTimeDifferenceAndGrace</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">),</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">30</span><span class="o">)))</span> <span class="cm">/* time-based window */</span><span class="o">)</span>
<span class="o">.</span><span class="na">reduce</span><span class="o">(</span>
<span class="o">(</span><span class="n">aggValue</span><span class="o">,</span> <span class="n">newValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">aggValue</span> <span class="o">+</span> <span class="n">newValue</span> <span class="cm">/* adder */</span>
<span class="o">);</span>
<span class="c1">// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">Windowed</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">sessionzedAggregatedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span>
<span class="n">SessionWindows</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">5</span><span class="o">)))</span> <span class="cm">/* session window */</span>
@ -3286,13 +3306,33 @@ become t=300,000).</span></p>
<div class="section" id="sliding-time-windows">
<span id="windowing-sliding"></span><h5><a class="toc-backref" href="#id22">Sliding time windows</a><a class="headerlink" href="#sliding-time-windows" title="Permalink to this headline"></a></h5>
<p>Sliding windows are actually quite different from hopping and tumbling windows. In Kafka Streams, sliding windows
are used only for <a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">join operations</span></a>, and can be specified through the
<code class="docutils literal"><span class="pre">JoinWindows</span></code> class.</p>
<p>A sliding window models a fixed-size window that slides continuously over the time axis; here, two data records are
are used for <a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">join operations</span></a>, specified by using the
<code class="docutils literal"><span class="pre">JoinWindows</span></code> class, and windowed aggregations, specified by using the <code class="docutils literal"><span class="pre">SlidingWindows</span></code> class.</p>
<p>A sliding window models a fixed-size window that slides continuously over the time axis. In this model, two data records are
said to be included in the same window if (in the case of symmetric windows) the difference of their timestamps is
within the window size. Thus, sliding windows are not aligned to the epoch, but to the data record timestamps. In
contrast to hopping and tumbling windows, the lower and upper window time interval bounds of sliding windows are
<em>both inclusive</em>.</p>
within the window size. As a sliding window moves along the time axis, records may fall into multiple snapshots of
the sliding window, but each unique combination of records appears only in one sliding window snapshot.</p>
<p>The following code defines a sliding window with a time difference of 10 minutes and a grace period of 30 minutes:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.SlidingWindows</span><span class="o">;</span>
<span class="c1">// A sliding time window with a time difference of 10 minutes and grace period of 30 minutes</span>
<span class="kt">Duration</span> <span class="n">timeDifferenceMs</span> <span class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">10</span><span class="o">);</span>
<span class="kt">Duration</span> <span class="n">gracePeriodMs</span> <span class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">30</span><span class="o">);</span>
<span class="n">SlidingWindows</span><span class="o">.</span><span class="na">withTimeDifferenceAndGrace</span><span class="o">(</span><span class="n">timeDifferenceMs</span><span class="o">,</span><span class="n">gracePeriodMs</span><span class="o">);</span>
</pre></div>
<div class="admonition note">
<p><b>Note</b></p>
<p>Sliding windows <em>require</em> that you set a grace period, as shown above. For time windows and session windows,
setting the grace period is optional and defaults to 24 hours.</p>
</div>
<div class="figure align-center" id="id35">
<img class="centered" src="/{{version}}/images/streams-sliding-windows.png">
<p class="caption"><span class="caption-text">This diagram shows windowing a stream of data records with sliding windows. The overlap of
the sliding window snapshots varies depending on the record times. In this diagram, the time numbers represent miliseconds. For example,
t=5 means &#8220;at the five milisecond mark&#8220;.</span></p>
</div>
<p>Sliding windows are aligned to the data record timestamps, not to the epoch. In contrast to hopping and tumbling windows,
the lower and upper window time interval bounds of sliding windows are both inclusive.</p>
</div>
<div class="section" id="session-windows">
<span id="windowing-session"></span><h5><a class="toc-backref" href="#id23">Session Windows</a><a class="headerlink" href="#session-windows" title="Permalink to this headline"></a></h5>

View File

@ -110,6 +110,11 @@
<code>delivery.timeout.ms</code> and <code>max.block.ms</code> for the producer and
<code>default.api.timeout.ms</code> for the admin client.
</p>
<p>
We added <code>SlidingWindows</code> as an option for <code>windowedBy()</code> windowed aggregations as described in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL">KIP-450</a>.
Sliding windows are fixed-time and data-aligned windows that allow for flexible and efficient windowed aggregations.
</p>
<h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>
<p>