KAFKA-10003: Mark KStream.through() as deprecated and update Scala API (#8679)

- part of KIP-221

Co-authored-by: John Roesler <john@confluent.io>
This commit is contained in:
Matthias J. Sax 2020-05-22 08:41:28 -07:00 committed by GitHub
parent d9e9a18a19
commit 27824baa21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 518 additions and 230 deletions

View File

@ -110,7 +110,7 @@ public class StreamsResetter {
private static String usage = "This tool helps to quickly reset an application in order to reprocess " private static String usage = "This tool helps to quickly reset an application in order to reprocess "
+ "its data from scratch.\n" + "its data from scratch.\n"
+ "* This tool resets offsets of input topics to the earliest available offset and it skips to the end of " + "* This tool resets offsets of input topics to the earliest available offset and it skips to the end of "
+ "intermediate topics (topics used in the through() method).\n" + "intermediate topics (topics that are input and output topics, e.g., used by deprecated through() method).\n"
+ "* This tool deletes the internal topics that were created by Kafka Streams (topics starting with " + "* This tool deletes the internal topics that were created by Kafka Streams (topics starting with "
+ "\"<application.id>-\").\n" + "\"<application.id>-\").\n"
+ "You do not need to specify internal topics because the tool finds them automatically.\n" + "You do not need to specify internal topics because the tool finds them automatically.\n"
@ -209,7 +209,7 @@ public class StreamsResetter {
.ofType(String.class) .ofType(String.class)
.withValuesSeparatedBy(',') .withValuesSeparatedBy(',')
.describedAs("list"); .describedAs("list");
intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method). For these topics, the tool will skip to the end.") intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics that are input and output topics, e.g., used in the deprecated through() method). For these topics, the tool will skip to the end.")
.withRequiredArg() .withRequiredArg()
.ofType(String.class) .ofType(String.class)
.withValuesSeparatedBy(',') .withValuesSeparatedBy(',')

View File

@ -33,7 +33,7 @@
<div class="section" id="data-types-and-serialization"> <div class="section" id="data-types-and-serialization">
<span id="streams-developer-guide-serdes"></span><h1>Data Types and Serialization<a class="headerlink" href="#data-types-and-serialization" title="Permalink to this headline"></a></h1> <span id="streams-developer-guide-serdes"></span><h1>Data Types and Serialization<a class="headerlink" href="#data-types-and-serialization" title="Permalink to this headline"></a></h1>
<p>Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. <code class="docutils literal"><span class="pre">java.lang.String</span></code>) to materialize the data when necessary. Operations that require such SerDes information include: <code class="docutils literal"><span class="pre">stream()</span></code>, <code class="docutils literal"><span class="pre">table()</span></code>, <code class="docutils literal"><span class="pre">to()</span></code>, <code class="docutils literal"><span class="pre">through()</span></code>, <code class="docutils literal"><span class="pre">groupByKey()</span></code>, <code class="docutils literal"><span class="pre">groupBy()</span></code>.</p> <p>Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. <code class="docutils literal"><span class="pre">java.lang.String</span></code>) to materialize the data when necessary. Operations that require such SerDes information include: <code class="docutils literal"><span class="pre">stream()</span></code>, <code class="docutils literal"><span class="pre">table()</span></code>, <code class="docutils literal"><span class="pre">to()</span></code>, <code class="docutils literal"><span class="pre">repartition()</span></code>, <code class="docutils literal"><span class="pre">groupByKey()</span></code>, <code class="docutils literal"><span class="pre">groupBy()</span></code>.</p>
<p>You can provide SerDes by using either of these methods:</p> <p>You can provide SerDes by using either of these methods:</p>
<ul class="simple"> <ul class="simple">
<li>By setting default SerDes in the <code class="docutils literal"><span class="pre">java.util.Properties</span></code> config instance.</li> <li>By setting default SerDes in the <code class="docutils literal"><span class="pre">java.util.Properties</span></code> config instance.</li>

View File

@ -1763,32 +1763,23 @@
streams/tables of a join &#8211; it is up to the user to ensure that this is the case.</p> streams/tables of a join &#8211; it is up to the user to ensure that this is the case.</p>
</div> </div>
<p><strong>Ensuring data co-partitioning:</strong> If the inputs of a join are not co-partitioned yet, you must ensure this manually. <p><strong>Ensuring data co-partitioning:</strong> If the inputs of a join are not co-partitioned yet, you must ensure this manually.
You may follow a procedure such as outlined below.</p> You may follow a procedure such as outlined below.
It is recommended to repartition the topic with fewer partitions to match the larger partition number of avoid bottlenecks.
Technically it would also be possible to repartition the topic with more partitions to the smaller partition number.
For stream-table joins, it's recommended to repartition the KStream because repartitioning a KTable might result is a second state store.
For table-table joins, you might also consider to size of the KTables and repartition the smaller KTable.</p>
<ol class="arabic"> <ol class="arabic">
<li><p class="first">Identify the input KStream/KTable in the join whose underlying Kafka topic has the smaller number of partitions. <li><p class="first">Identify the input KStream/KTable in the join whose underlying Kafka topic has the smaller number of partitions.
Let&#8217;s call this stream/table &#8220;SMALLER&#8221;, and the other side of the join &#8220;LARGER&#8221;. To learn about the number of Let&#8217;s call this stream/table &#8220;SMALLER&#8221;, and the other side of the join &#8220;LARGER&#8221;. To learn about the number of
partitions of a Kafka topic you can use, for example, the CLI tool <code class="docutils literal"><span class="pre">bin/kafka-topics</span></code> with the <code class="docutils literal"><span class="pre">--describe</span></code> partitions of a Kafka topic you can use, for example, the CLI tool <code class="docutils literal"><span class="pre">bin/kafka-topics</span></code> with the <code class="docutils literal"><span class="pre">--describe</span></code>
option.</p> option.</p>
</li> </li>
<li><p class="first">Pre-create a new Kafka topic for &#8220;SMALLER&#8221; that has the same number of partitions as &#8220;LARGER&#8221;. Let&#8217;s call this <li><p class="first">Within your application, re-partition the data of &#8220;SMALLER&#8221;. You must ensure that, when repartitioning
new topic &#8220;repartitioned-topic-for-smaller&#8221;. Typically, you&#8217;d use the CLI tool <code class="docutils literal"><span class="pre">bin/kafka-topics</span></code> with the the data with <code class="docutils literal"><span class="pre">repartition</span></code>, the same partitioner is used as for &#8220;LARGER&#8221;.</p>
<code class="docutils literal"><span class="pre">--create</span></code> option for this.</p>
</li>
<li><p class="first">Within your application, re-write the data of &#8220;SMALLER&#8221; into the new Kafka topic. You must ensure that, when writing
the data with <code class="docutils literal"><span class="pre">to</span></code> or <code class="docutils literal"><span class="pre">through</span></code>, the same partitioner is used as for &#8220;LARGER&#8221;.</p>
<blockquote> <blockquote>
<div><ul class="simple"> <div><ul class="simple">
<li>If &#8220;SMALLER&#8221; is a KStream: <code class="docutils literal"><span class="pre">KStream#to(&quot;repartitioned-topic-for-smaller&quot;)</span></code>.</li> <li>If &#8220;SMALLER&#8221; is a KStream: <code class="docutils literal"><span class="pre">KStream#repartition(Repartitioned.numberOfPartitions(...))</span></code>.</li>
<li>If &#8220;SMALLER&#8221; is a KTable: <code class="docutils literal"><span class="pre">KTable#to(&quot;repartitioned-topic-for-smaller&quot;)</span></code>.</li> <li>If &#8220;SMALLER&#8221; is a KTable: <code class="docutils literal"><span class="pre">KTable#toStream#repartition(Repartitioned.numberOfPartitions(...).toTable())</span></code>.</li>
</ul>
</div></blockquote>
</li>
<li><p class="first">Within your application, re-read the data in &#8220;repartitioned-topic-for-smaller&#8221; into
a new KStream/KTable.</p>
<blockquote>
<div><ul class="simple">
<li>If &#8220;SMALLER&#8221; is a KStream: <code class="docutils literal"><span class="pre">StreamsBuilder#stream(&quot;repartitioned-topic-for-smaller&quot;)</span></code>.</li>
<li>If &#8220;SMALLER&#8221; is a KTable: <code class="docutils literal"><span class="pre">StreamsBuilder#table(&quot;repartitioned-topic-for-smaller&quot;)</span></code>.</li>
</ul> </ul>
</div></blockquote> </div></blockquote>
</li> </li>
@ -3679,58 +3670,6 @@ groupedTable
<span class="c1">// Write the stream to the output topic, using explicit key and value serdes,</span> <span class="c1">// Write the stream to the output topic, using explicit key and value serdes,</span>
<span class="c1">// (thus overriding the defaults in the config properties).</span> <span class="c1">// (thus overriding the defaults in the config properties).</span>
<span class="n">stream</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;my-stream-output-topic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span> <span class="n">stream</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;my-stream-output-topic&quot;</span><span class="o">,</span> <span class="n">Produced</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">());</span>
</pre></div>
</div>
<p><strong>Causes data re-partitioning if any of the following conditions is true:</strong></p>
<ol class="last arabic simple">
<li>If the output topic has a different number of partitions than the stream/table.</li>
<li>If the <code class="docutils literal"><span class="pre">KStream</span></code> was marked for re-partitioning.</li>
<li>If you provide a custom <code class="docutils literal"><span class="pre">StreamPartitioner</span></code> to explicitly control how to distribute the output records
across the partitions of the output topic.</li>
<li>If the key of an output record is <code class="docutils literal"><span class="pre">null</span></code>.</li>
</ol>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Through</strong></p>
<ul class="last simple">
<li>KStream -&gt; KStream</li>
<li>KTable -&gt; KTable</li>
</ul>
</td>
<td><p class="first">Write the records to a Kafka topic and create a new stream/table from that topic.
Essentially a shorthand for <code class="docutils literal"><span class="pre">KStream#to()</span></code> followed by <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code>, same for tables.
(<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#through(java.lang.String)">KStream details</a>)</p>
<p>When to provide SerDes explicitly:</p>
<ul class="simple">
<li>If you do not specify SerDes explicitly, the default SerDes from the
<a class="reference internal" href="config-streams.html#streams-developer-guide-configuration"><span class="std std-ref">configuration</span></a> are used.</li>
<li>You <strong>must specify SerDes explicitly</strong> if the key and/or value types of the <code class="docutils literal"><span class="pre">KStream</span></code> or <code class="docutils literal"><span class="pre">KTable</span></code> do not
match the configured default SerDes.</li>
<li>See <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data Types and Serialization</span></a> for information about configuring default SerDes, available SerDes,
and implementing your own custom SerDes.</li>
</ul>
<p>A variant of <code class="docutils literal"><span class="pre">through</span></code> exists that enables you to specify how the data is produced by using a <code class="docutils literal"><span class="pre">Produced</span></code>
instance to specify, for example, a <code class="docutils literal"><span class="pre">StreamPartitioner</span></code> that gives you control over
how output records are distributed across the partitions of the output topic.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">StreamsBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">table</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Variant 1: Imagine that your application needs to continue reading and processing</span>
<span class="c1">// the records after they have been written to a topic via ``to()``. Here, one option</span>
<span class="c1">// is to write to an output topic, then read from the same topic by constructing a</span>
<span class="c1">// new stream from it, and then begin processing it (here: via `map`, for example).</span>
<span class="n">stream</span><span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">&quot;my-stream-output-topic&quot;</span><span class="o">);</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">newStream</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">stream</span><span class="o">(</span><span class="s">&quot;my-stream-output-topic&quot;</span><span class="o">).</span><span class="na">map</span><span class="o">(...);</span>
<span class="c1">// Variant 2 (better): Since the above is a common pattern, the DSL provides the</span>
<span class="c1">// convenience method ``through`` that is equivalent to the code above.</span>
<span class="c1">// Note that you may need to specify key and value serdes explicitly, which is</span>
<span class="c1">// not shown in this simple example.</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">newStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">through</span><span class="o">(</span><span class="s">&quot;user-clicks-topic&quot;</span><span class="o">).</span><span class="na">map</span><span class="o">(...);</span>
<span class="c1">// ``through`` is also available for tables</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">newTable</span> <span class="o">=</span> <span class="n">table</span><span class="o">.</span><span class="na">through</span><span class="o">(</span><span class="s">&quot;my-table-output-topic&quot;</span><span class="o">).</span><span class="na">map</span><span class="o">(...);</span>
</pre></div> </pre></div>
</div> </div>
<p><strong>Causes data re-partitioning if any of the following conditions is true:</strong></p> <p><strong>Causes data re-partitioning if any of the following conditions is true:</strong></p>
@ -3854,8 +3793,8 @@ object WordCountApplication extends App {
</div> </div>
<div class="section" id="scala-dsl-implicit-serdes"> <div class="section" id="scala-dsl-implicit-serdes">
<span id="streams-developer-guide-dsl-scala-dsl-implicit-serdes"></span><h3><a class="toc-backref" href="#id29">Implicit SerDes</a><a class="headerlink" href="#scala-dsl-implicit-serdes" title="Permalink to this headline"></a></h3> <span id="streams-developer-guide-dsl-scala-dsl-implicit-serdes"></span><h3><a class="toc-backref" href="#id29">Implicit SerDes</a><a class="headerlink" href="#scala-dsl-implicit-serdes" title="Permalink to this headline"></a></h3>
<p>One of the common complaints of Scala users with the Java API has been the repetitive usage of the SerDes in API invocations. Many of the APIs need to take the SerDes through abstractions like <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code>. And the user has to supply them every time through the with function of these classes.</p> <p>One of the common complaints of Scala users with the Java API has been the repetitive usage of the SerDes in API invocations. Many of the APIs need to take the SerDes through abstractions like <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Repartitioned</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code>. And the user has to supply them every time through the with function of these classes.</p>
<p>The library uses the power of <a href="https://docs.scala-lang.org/tour/implicit-parameters.html">Scala implicit parameters</a> to alleviate this concern. As a user you can provide implicit SerDes or implicit values of <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> available in scope.</p> <p>The library uses the power of <a href="https://docs.scala-lang.org/tour/implicit-parameters.html">Scala implicit parameters</a> to alleviate this concern. As a user you can provide implicit SerDes or implicit values of <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Repartitioned</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of <code class="docutils literal"><span class="pre">Grouped</span></code>, <code class="docutils literal"><span class="pre">Produced</span></code>, <code class="docutils literal"><span class="pre">Consumed</span></code> or <code class="docutils literal"><span class="pre">Joined</span></code> available in scope.</p>
<p>The library also bundles all implicit SerDes of the commonly used primitive types in a Scala module - so just import the module vals and have all SerDes in scope. A similar strategy of modular implicits can be adopted for any user-defined SerDes as well (User-defined SerDes are discussed in the next section).</p> <p>The library also bundles all implicit SerDes of the commonly used primitive types in a Scala module - so just import the module vals and have all SerDes in scope. A similar strategy of modular implicits can be adopted for any user-defined SerDes as well (User-defined SerDes are discussed in the next section).</p>
<p>Here's an example:</p> <p>Here's an example:</p>
<pre class="brush: scala;"> <pre class="brush: scala;">

View File

@ -49,8 +49,7 @@
<dd>Topics that are specified via sink processors in the application&#8217;s topology; e.g. via <dd>Topics that are specified via sink processors in the application&#8217;s topology; e.g. via
<code class="docutils literal"><span class="pre">KStream#to()</span></code>, <code class="docutils literal"><span class="pre">KTable.to()</span></code> and <code class="docutils literal"><span class="pre">Topology#addSink()</span></code>.</dd> <code class="docutils literal"><span class="pre">KStream#to()</span></code>, <code class="docutils literal"><span class="pre">KTable.to()</span></code> and <code class="docutils literal"><span class="pre">Topology#addSink()</span></code>.</dd>
<dt>Intermediate topics</dt> <dt>Intermediate topics</dt>
<dd>Topics that are both input and output topics of the application&#8217;s topology; e.g. via <dd>Topics that are both input and output topics of the application&#8217;s topology.</dd>
<code class="docutils literal"><span class="pre">KStream#through()</span></code>.</dd>
</dl> </dl>
<p>User topics must be created and manually managed ahead of time (e.g., via the <p>User topics must be created and manually managed ahead of time (e.g., via the
<a class="reference internal" href="../../kafka/post-deployment.html#kafka-operations-admin"><span class="std std-ref">topic tools</span></a>). If user topics are shared among multiple applications for reading and <a class="reference internal" href="../../kafka/post-deployment.html#kafka-operations-admin"><span class="std std-ref">topic tools</span></a>). If user topics are shared among multiple applications for reading and

View File

@ -95,9 +95,11 @@
Note that you need brokers with version 2.5 or newer to use this feature. Note that you need brokers with version 2.5 or newer to use this feature.
</p> </p>
<p> <p>
As of 2.6.0 Kafka Streams offers a new <code>KStream.repartition()</code> operator (as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>). As of 2.6.0 Kafka Streams deprecates <code>KStream.through()<code> if favor of the new <code>KStream.repartition()</code> operator
(as per <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>).
<code>KStream.repartition()</code> is similar to <code>KStream.through()</code>, however Kafka Streams will manage the topic for you. <code>KStream.repartition()</code> is similar to <code>KStream.through()</code>, however Kafka Streams will manage the topic for you.
Refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details. If you need to write into and read back from a topic that you mange, you can fall back to use <code>KStream.to()</code> in combination with <code>StreamsBuilder#stream()</code>.
Please refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details about <code>KStream.repartition()</code>.
</p> </p>
<h3><a id="streams_api_changes_250" href="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3> <h3><a id="streams_api_changes_250" href="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.kafka.streams; package org.apache.kafka.streams;
import java.util.LinkedList;
import java.util.TreeMap;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -43,7 +41,7 @@ import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics; import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
@ -52,7 +50,6 @@ import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread; import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StateDirectory;
@ -79,10 +76,12 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -1082,7 +1081,7 @@ public class KafkaStreams implements AutoCloseable {
* This will use the default Kafka Streams partitioner to locate the partition. * This will use the default Kafka Streams partitioner to locate the partition.
* If a {@link StreamPartitioner custom partitioner} has been * If a {@link StreamPartitioner custom partitioner} has been
* {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or
* {@link KStream#through(String, Produced)}, or if the original {@link KTable}'s input * {@link KStream#repartition(Repartitioned)}, or if the original {@link KTable}'s input
* {@link StreamsBuilder#table(String) topic} is partitioned differently, please use * {@link StreamsBuilder#table(String) topic} is partitioned differently, please use
* {@link #metadataForKey(String, Object, StreamPartitioner)}. * {@link #metadataForKey(String, Object, StreamPartitioner)}.
* <p> * <p>

View File

@ -43,9 +43,9 @@ public interface CogroupedKStream<K, VOut> {
* <p> * <p>
* The added {@link KGroupedStream grouped KStream} must have the same number of partitions as all existing * The added {@link KGroupedStream grouped KStream} must have the same number of partitions as all existing
* streams of this {@code CogroupedKStream}. * streams of this {@code CogroupedKStream}.
* If this is not the case, you would need to call {@link KStream#through(String)} before * If this is not the case, you would need to call {@link KStream#repartition(Repartitioned)} before
* {@link KStream#groupByKey() grouping} the {@link KStream}, using a pre-created topic with the "correct" number of * {@link KStream#groupByKey() grouping} the {@link KStream} and specify the "correct" number of
* partitions. * partitions via {@link Repartitioned) parameter.
* <p> * <p>
* The specified {@link Aggregator} is applied in the actual {@link #aggregate(Initializer) aggregation} step for * The specified {@link Aggregator} is applied in the actual {@link #aggregate(Initializer) aggregation} step for
* each input record and computes a new aggregate using the current aggregate (or for the very first record per key * each input record and computes a new aggregate using the current aggregate (or for the very first record per key

View File

@ -815,9 +815,10 @@ public interface KStream<K, V> {
* *
* @param topic the topic name * @param topic the topic name
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream} * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
* @see #repartition() * @deprecated since 2.6; used {@link #repartition()} instead
* @see #repartition(Repartitioned)
*/ */
// TODO: when removed, update `StreamsResetter` decription of --intermediate-topics
@Deprecated
KStream<K, V> through(final String topic); KStream<K, V> through(final String topic);
/** /**
@ -835,9 +836,9 @@ public interface KStream<K, V> {
* @param topic the topic name * @param topic the topic name
* @param produced the options to use when producing to the topic * @param produced the options to use when producing to the topic
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream} * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
* @see #repartition() * @deprecated since 2.6; use {@link #repartition(Repartitioned)} instead
* @see #repartition(Repartitioned)
*/ */
@Deprecated
KStream<K, V> through(final String topic, KStream<K, V> through(final String topic,
final Produced<K, V> produced); final Produced<K, V> produced);
@ -846,7 +847,6 @@ public interface KStream<K, V> {
* from the auto-generated topic using default serializers, deserializers, and producer's {@link DefaultPartitioner}. * from the auto-generated topic using default serializers, deserializers, and producer's {@link DefaultPartitioner}.
* The number of partitions is determined based on the upstream topics partition numbers. * The number of partitions is determined based on the upstream topics partition numbers.
* <p> * <p>
* This operation is similar to {@link #through(String)}, however, Kafka Streams manages the used topic automatically.
* The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
* Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
* The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
@ -854,8 +854,6 @@ public interface KStream<K, V> {
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix. * "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
* *
* @return {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. * @return {@code KStream} that contains the exact same repartitioned records as this {@code KStream}.
* @see #through(String)
* @see #through(String, Produced)
*/ */
KStream<K, V> repartition(); KStream<K, V> repartition();
@ -864,7 +862,6 @@ public interface KStream<K, V> {
* from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner}, * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
* number of partitions, and topic name part as defined by {@link Repartitioned}. * number of partitions, and topic name part as defined by {@link Repartitioned}.
* <p> * <p>
* This operation is similar to {@link #through(String)}, however, Kafka Streams manages the used topic automatically.
* The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance. * The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
* Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams. * Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
* The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in * The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
@ -876,8 +873,6 @@ public interface KStream<K, V> {
* {@link StreamPartitioner} which determines how records are distributed among partitions of the topic, * {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
* part of the topic name, and number of partitions for a repartition topic. * part of the topic name, and number of partitions for a repartition topic.
* @return a {@code KStream} that contains the exact same repartitioned records as this {@code KStream}. * @return a {@code KStream} that contains the exact same repartitioned records as this {@code KStream}.
* @see #through(String)
* @see #through(String, Produced)
*/ */
KStream<K, V> repartition(final Repartitioned<K, V> repartitioned); KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);
@ -925,9 +920,8 @@ public interface KStream<K, V> {
* Convert this stream to a {@link KTable}. * Convert this stream to a {@link KTable}.
* <p> * <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic will be created in Kafka.
* {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix. * "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
@ -952,9 +946,8 @@ public interface KStream<K, V> {
* Convert this stream to a {@link KTable}. * Convert this stream to a {@link KTable}.
* <p> * <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic will be created in Kafka.
* {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix. * "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
@ -980,9 +973,8 @@ public interface KStream<K, V> {
* Convert this stream to a {@link KTable}. * Convert this stream to a {@link KTable}.
* <p> * <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic will be created in Kafka.
* {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix. * "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
@ -1009,9 +1001,8 @@ public interface KStream<K, V> {
* Convert this stream to a {@link KTable}. * Convert this stream to a {@link KTable}.
* <p> * <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic will be created in Kafka.
* {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix. * "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
@ -1141,10 +1132,9 @@ public interface KStream<K, V> {
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}. * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p> * <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic may need to be created in
* {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later * Kafka if a later operator depends on the newly selected key.
* operator depends on the newly selected key.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix. * "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
@ -1171,10 +1161,9 @@ public interface KStream<K, V> {
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}. * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p> * <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic may need to be created in
* {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka * Kafka if a later operator depends on the newly selected key.
* if a later operator depends on the newly selected key.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix. * "&lt;name&gt;" is an internally generated name, and "-repartition" is a fixed suffix.
@ -1202,10 +1191,9 @@ public interface KStream<K, V> {
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}. * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p> * <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via * {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic may need to be created in
* {@link #through(String)}) an internal repartitioning topic may need to be created in Kafka if a later operator * Kafka if a later operator depends on the newly selected key.
* depends on the newly selected key.
* This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in * This topic will be named "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* &lt;name&gt; is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally * &lt;name&gt; is either provided via {@link org.apache.kafka.streams.kstream.Grouped#as(String)} or an internally
@ -1262,8 +1250,8 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* join, using a pre-created topic with the "correct" number of partitions. * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
@ -1339,8 +1327,8 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* join, using a pre-created topic with the "correct" number of partitions. * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
@ -1421,8 +1409,8 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* join, using a pre-created topic with the "correct" number of partitions. * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
@ -1504,8 +1492,8 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* join, using a pre-created topic with the "correct" number of partitions. * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
@ -1585,8 +1573,8 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* join, using a pre-created topic with the "correct" number of partitions. * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
@ -1671,8 +1659,8 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* join, using a pre-created topic with the "correct" number of partitions. * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
@ -1755,8 +1743,8 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* join, using a pre-created topic with the "correct" number of partitions. * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
@ -1837,8 +1825,8 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* join, using a pre-created topic with the "correct" number of partitions. * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
@ -1924,8 +1912,8 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the * If this is not the case, you would need to call {@link #repartition(Repartitioned)} (for one input stream) before
* join, using a pre-created topic with the "correct" number of partitions. * doing the join and specify the "correct" number of partitions via {@link Repartitioned} parameter.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner).
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
* internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
@ -2010,8 +1998,9 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} for this {@code KStream} before doing * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* the join, using a pre-created topic with the same number of partitions as the given {@link KTable}. * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
@ -2085,8 +2074,9 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} for this {@code KStream} before doing * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* the join, using a pre-created topic with the same number of partitions as the given {@link KTable}. * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
@ -2166,8 +2156,9 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} for this {@code KStream} before doing * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* the join, using a pre-created topic with the same number of partitions as the given {@link KTable}. * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
@ -2244,8 +2235,9 @@ public interface KStream<K, V> {
* </table> * </table>
* Both input streams (or to be more precise, their underlying source topics) need to have the same number of * Both input streams (or to be more precise, their underlying source topics) need to have the same number of
* partitions. * partitions.
* If this is not the case, you would need to call {@link #through(String)} for this {@code KStream} before doing * If this is not the case, you would need to call {@link #repartition(Repartitioned)} for this {@code KStream}
* the join, using a pre-created topic with the same number of partitions as the given {@link KTable}. * before doing the join, specifying the same number of partitions via {@link Repartitioned} parameter as the given
* {@link KTable}.
* Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner);
* cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
* If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
@ -2481,8 +2473,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code transform()}.
* {@code transform()}.
* <p> * <p>
* Transforming records might result in an internal data redistribution if a key based operator (like an aggregation * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
* or join) is applied to the result {@code KStream}. * or join) is applied to the result {@code KStream}.
@ -2575,8 +2566,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code transform()}.
* {@code transform()}.
* <p> * <p>
* Transforming records might result in an internal data redistribution if a key based operator (like an aggregation * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
* or join) is applied to the result {@code KStream}. * or join) is applied to the result {@code KStream}.
@ -2674,7 +2664,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code flatTransform()}. * {@code flatTransform()}.
* <p> * <p>
* Transforming records might result in an internal data redistribution if a key based operator (like an aggregation * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
@ -2767,7 +2757,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code flatTransform()}. * {@code flatTransform()}.
* <p> * <p>
* Transforming records might result in an internal data redistribution if a key based operator (like an aggregation * Transforming records might result in an internal data redistribution if a key based operator (like an aggregation
@ -2856,7 +2846,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code transformValues()}. * {@code transformValues()}.
* <p> * <p>
* Setting a new value preserves data co-location with respect to the key. * Setting a new value preserves data co-location with respect to the key.
@ -2932,7 +2922,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code transformValues()}. * {@code transformValues()}.
* <p> * <p>
* Setting a new value preserves data co-location with respect to the key. * Setting a new value preserves data co-location with respect to the key.
@ -3013,7 +3003,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code transformValues()}. * {@code transformValues()}.
* <p> * <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
@ -3093,7 +3083,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition() should be performed before
* {@code transformValues()}. * {@code transformValues()}.
* <p> * <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
@ -3181,7 +3171,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code flatTransformValues()}. * {@code flatTransformValues()}.
* <p> * <p>
* Setting a new value preserves data co-location with respect to the key. * Setting a new value preserves data co-location with respect to the key.
@ -3270,7 +3260,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code flatTransformValues()}. * {@code flatTransformValues()}.
* <p> * <p>
* Setting a new value preserves data co-location with respect to the key. * Setting a new value preserves data co-location with respect to the key.
@ -3361,7 +3351,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code flatTransformValues()}. * {@code flatTransformValues()}.
* <p> * <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
@ -3451,7 +3441,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String) through()} should be performed before * If repartitioning is required, a call to {@link #repartition()} should be performed before
* {@code flatTransformValues()}. * {@code flatTransformValues()}.
* <p> * <p>
* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
@ -3528,7 +3518,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}. * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
* *
* @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor} * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor}
* @param stateStoreNames the names of the state store used by the processor * @param stateStoreNames the names of the state store used by the processor
@ -3590,7 +3580,7 @@ public interface KStream<K, V> {
* } * }
* }</pre> * }</pre>
* Even if any upstream operation was key-changing, no auto-repartition is triggered. * Even if any upstream operation was key-changing, no auto-repartition is triggered.
* If repartitioning is required, a call to {@link #through(String)} should be performed before {@code transform()}. * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}.
* *
* @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor} * @param processorSupplier a instance of {@link ProcessorSupplier} that generates a {@link Processor}
* @param named a {@link Named} config used to name the processor in the topology * @param named a {@link Named} config used to name the processor in the topology

View File

@ -532,11 +532,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder); builder);
} }
@Deprecated
@Override @Override
public KStream<K, V> through(final String topic) { public KStream<K, V> through(final String topic) {
return through(topic, Produced.with(keySerde, valSerde, null)); return through(topic, Produced.with(keySerde, valSerde, null));
} }
@Deprecated
@Override @Override
public KStream<K, V> through(final String topic, public KStream<K, V> through(final String topic,
final Produced<K, V> produced) { final Produced<K, V> produced) {

View File

@ -21,9 +21,9 @@ import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
class RepartitionedInternal<K, V> extends Repartitioned<K, V> { public class RepartitionedInternal<K, V> extends Repartitioned<K, V> {
RepartitionedInternal(final Repartitioned<K, V> repartitioned) { public RepartitionedInternal(final Repartitioned<K, V> repartitioned) {
super(repartitioned); super(repartitioned);
} }
@ -31,23 +31,23 @@ class RepartitionedInternal<K, V> extends Repartitioned<K, V> {
return new InternalTopicProperties(numberOfPartitions()); return new InternalTopicProperties(numberOfPartitions());
} }
String name() { public String name() {
return name; return name;
} }
Serde<K> keySerde() { public Serde<K> keySerde() {
return keySerde; return keySerde;
} }
Serde<V> valueSerde() { public Serde<V> valueSerde() {
return valueSerde; return valueSerde;
} }
StreamPartitioner<K, V> streamPartitioner() { public StreamPartitioner<K, V> streamPartitioner() {
return partitioner; return partitioner;
} }
Integer numberOfPartitions() { public Integer numberOfPartitions() {
return numberOfPartitions; return numberOfPartitions;
} }
} }

View File

@ -265,6 +265,7 @@ public class StreamsBuilderTest {
processorSupplier.theCapturedProcessor().processed); processorSupplier.theCapturedProcessor().processed);
} }
@Deprecated
@Test @Test
public void shouldProcessViaThroughTopic() { public void shouldProcessViaThroughTopic() {
final KStream<String, String> source = builder.stream("topic-source"); final KStream<String, String> source = builder.stream("topic-source");
@ -286,6 +287,27 @@ public class StreamsBuilderTest {
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed); assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed);
} }
@Test
public void shouldProcessViaRepartitionTopic() {
final KStream<String, String> source = builder.stream("topic-source");
final KStream<String, String> through = source.repartition();
final MockProcessorSupplier<String, String> sourceProcessorSupplier = new MockProcessorSupplier<>();
source.process(sourceProcessorSupplier);
final MockProcessorSupplier<String, String> throughProcessorSupplier = new MockProcessorSupplier<>();
through.process(throughProcessorSupplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("topic-source", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
inputTopic.pipeInput("A", "aa");
}
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed);
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed);
}
@Test @Test
public void shouldMergeStreams() { public void shouldMergeStreams() {
final String topic1 = "topic-1"; final String topic1 = "topic-1";

View File

@ -292,14 +292,16 @@ public abstract class AbstractResetIntegrationTest {
cleanGlobal(false, null, null); cleanGlobal(false, null, null);
} }
void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { void testReprocessingFromScratchAfterResetWithIntermediateUserTopic(final boolean useRepartitioned) throws Exception {
if (!useRepartitioned) {
cluster.createTopic(INTERMEDIATE_USER_TOPIC); cluster.createTopic(INTERMEDIATE_USER_TOPIC);
}
appID = testId + "-from-scratch-with-intermediate-topic"; appID = testId + "-from-scratch-with-intermediate-topic";
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
// RUN // RUN
streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfig); streams = new KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, OUTPUT_TOPIC_2), streamsConfig);
streams.start(); streams.start();
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
// receive only first values to make sure intermediate user topic is not consumed completely // receive only first values to make sure intermediate user topic is not consumed completely
@ -312,19 +314,21 @@ public abstract class AbstractResetIntegrationTest {
// insert bad record to make sure intermediate user topic gets seekToEnd() // insert bad record to make sure intermediate user topic gets seekToEnd()
mockTime.sleep(1); mockTime.sleep(1);
final KeyValue<Long, String> badMessage = new KeyValue<>(-1L, "badRecord-ShouldBeSkipped"); final KeyValue<Long, String> badMessage = new KeyValue<>(-1L, "badRecord-ShouldBeSkipped");
if (!useRepartitioned) {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
INTERMEDIATE_USER_TOPIC, INTERMEDIATE_USER_TOPIC,
Collections.singleton(badMessage), Collections.singleton(badMessage),
producerConfig, producerConfig,
mockTime.milliseconds()); mockTime.milliseconds());
}
// RESET // RESET
streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfig); streams = new KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned, OUTPUT_TOPIC_2_RERUN), streamsConfig);
streams.cleanUp(); streams.cleanUp();
cleanGlobal(true, null, null); cleanGlobal(!useRepartitioned, null, null);
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC); assertInternalTopicsGotDeleted(useRepartitioned ? null : INTERMEDIATE_USER_TOPIC);
// RE-RUN // RE-RUN
streams.start(); streams.start();
@ -335,6 +339,7 @@ public abstract class AbstractResetIntegrationTest {
assertThat(resultRerun, equalTo(result)); assertThat(resultRerun, equalTo(result));
assertThat(resultRerun2, equalTo(result2)); assertThat(resultRerun2, equalTo(result2));
if (!useRepartitioned) {
final Properties props = TestUtils.consumerConfig(cluster.bootstrapServers(), testId + "-result-consumer", LongDeserializer.class, StringDeserializer.class, commonClientConfig); final Properties props = TestUtils.consumerConfig(cluster.bootstrapServers(), testId + "-result-consumer", LongDeserializer.class, StringDeserializer.class, commonClientConfig);
final List<KeyValue<Long, String>> resultIntermediate = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(props, INTERMEDIATE_USER_TOPIC, 21); final List<KeyValue<Long, String>> resultIntermediate = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(props, INTERMEDIATE_USER_TOPIC, 21);
@ -342,12 +347,15 @@ public abstract class AbstractResetIntegrationTest {
assertThat(resultIntermediate.get(i), equalTo(resultIntermediate.get(i + 11))); assertThat(resultIntermediate.get(i), equalTo(resultIntermediate.get(i + 11)));
} }
assertThat(resultIntermediate.get(10), equalTo(badMessage)); assertThat(resultIntermediate.get(10), equalTo(badMessage));
}
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT); waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(true, null, null); cleanGlobal(!useRepartitioned, null, null);
if (!useRepartitioned) {
cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC); cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
} }
}
void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception { void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
appID = testId + "-from-file"; appID = testId + "-from-file";
@ -472,7 +480,8 @@ public abstract class AbstractResetIntegrationTest {
cleanGlobal(false, null, null); cleanGlobal(false, null, null);
} }
private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic2) { private Topology setupTopologyWithIntermediateTopic(final boolean useRepartitioned,
final String outputTopic2) {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, String> input = builder.stream(INPUT_TOPIC); final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
@ -484,8 +493,14 @@ public abstract class AbstractResetIntegrationTest {
.toStream() .toStream()
.to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long())); .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
input.through(INTERMEDIATE_USER_TOPIC) final KStream<Long, String> stream;
.groupByKey() if (useRepartitioned) {
stream = input.repartition();
} else {
input.to(INTERMEDIATE_USER_TOPIC);
stream = builder.stream(INTERMEDIATE_USER_TOPIC);
}
stream.groupByKey()
.windowedBy(TimeWindows.of(ofMillis(35)).advanceBy(ofMillis(10))) .windowedBy(TimeWindows.of(ofMillis(35)).advanceBy(ofMillis(10)))
.count() .count()
.toStream() .toStream()

View File

@ -213,7 +213,8 @@ public class EosIntegrationTest {
final KStream<Long, Long> input = builder.stream(inputTopic); final KStream<Long, Long> input = builder.stream(inputTopic);
KStream<Long, Long> output = input; KStream<Long, Long> output = input;
if (throughTopic != null) { if (throughTopic != null) {
output = input.through(throughTopic); input.to(throughTopic);
output = builder.stream(throughTopic);
} }
output.to(outputTopic); output.to(outputTopic);

View File

@ -74,7 +74,12 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
@Test @Test
public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(); super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(false);
}
@Test
public void testReprocessingFromScratchAfterResetWithIntermediateInternalTopic() throws Exception {
super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(true);
} }
@Test @Test
@ -93,7 +98,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest {
} }
@Test @Test
public void shouldNotAllowToResetWhileStreamsRunning() throws Exception { public void shouldNotAllowToResetWhileStreamsRunning() {
super.shouldNotAllowToResetWhileStreamsIsRunning(); super.shouldNotAllowToResetWhileStreamsIsRunning();
} }

View File

@ -42,7 +42,7 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
private static final String TEST_ID = "reset-with-ssl-integration-test"; private static final String TEST_ID = "reset-with-ssl-integration-test";
private static Map<String, Object> sslConfig; private static final Map<String, Object> SSL_CONFIG;
static { static {
final Properties brokerProps = new Properties(); final Properties brokerProps = new Properties();
@ -52,11 +52,11 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
brokerProps.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L); brokerProps.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
try { try {
sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); SSL_CONFIG = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
brokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0"); brokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0");
brokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL"); brokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL");
brokerProps.putAll(sslConfig); brokerProps.putAll(SSL_CONFIG);
} catch (final Exception e) { } catch (final Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -66,7 +66,7 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
@Override @Override
Map<String, Object> getClientSslConfig() { Map<String, Object> getClientSslConfig() {
return sslConfig; return SSL_CONFIG;
} }
@Before @Before
@ -88,6 +88,11 @@ public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
@Test @Test
public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception {
super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(); super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(false);
}
@Test
public void testReprocessingFromScratchAfterResetWithIntermediateInternalTopic() throws Exception {
super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(true);
} }
} }

View File

@ -41,6 +41,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.TransformerSupplier;
@ -556,6 +557,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("named can't be null")); assertThat(exception.getMessage(), equalTo("named can't be null"));
} }
@Deprecated // specifically testing the deprecated variant
@Test @Test
public void shouldNotAllowNullTopicOnThrough() { public void shouldNotAllowNullTopicOnThrough() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -564,6 +566,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("topic can't be null")); assertThat(exception.getMessage(), equalTo("topic can't be null"));
} }
@Deprecated // specifically testing the deprecated variant
@Test @Test
public void shouldNotAllowNullTopicOnThroughWithProduced() { public void shouldNotAllowNullTopicOnThroughWithProduced() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -572,6 +575,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("topic can't be null")); assertThat(exception.getMessage(), equalTo("topic can't be null"));
} }
@Deprecated // specifically testing the deprecated variant
@Test @Test
public void shouldNotAllowNullProducedOnThrough() { public void shouldNotAllowNullProducedOnThrough() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -588,6 +592,14 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("topic can't be null")); assertThat(exception.getMessage(), equalTo("topic can't be null"));
} }
@Test
public void shouldNotAllowNullRepartitionedOnRepartition() {
final NullPointerException exception = assertThrows(
NullPointerException.class,
() -> testStream.repartition(null));
assertThat(exception.getMessage(), equalTo("repartitioned can't be null"));
}
@Test @Test
public void shouldNotAllowNullTopicChooserOnTo() { public void shouldNotAllowNullTopicChooserOnTo() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -636,7 +648,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("keySelector can't be null")); assertThat(exception.getMessage(), equalTo("keySelector can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullSelectorOnGroupByWithSerialized() { public void shouldNotAllowNullSelectorOnGroupByWithSerialized() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -653,7 +665,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("keySelector can't be null")); assertThat(exception.getMessage(), equalTo("keySelector can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullSerializedOnGroupBy() { public void shouldNotAllowNullSerializedOnGroupBy() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -670,7 +682,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("grouped can't be null")); assertThat(exception.getMessage(), equalTo("grouped can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullSerializedOnGroupByKey() { public void shouldNotAllowNullSerializedOnGroupByKey() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -727,7 +739,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be null")); assertThat(exception.getMessage(), equalTo("otherStream can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullOtherStreamOnJoinWithJoined() { public void shouldNotAllowNullOtherStreamOnJoinWithJoined() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -760,7 +772,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullValueJoinerOnJoinWithJoined() { public void shouldNotAllowNullValueJoinerOnJoinWithJoined() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -793,7 +805,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullJoinWindowsOnJoinWithJoined() { public void shouldNotAllowNullJoinWindowsOnJoinWithJoined() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -818,7 +830,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullJoinedOnJoin() { public void shouldNotAllowNullJoinedOnJoin() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -851,7 +863,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be null")); assertThat(exception.getMessage(), equalTo("otherStream can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullOtherStreamOnLeftJoinWithJoined() { public void shouldNotAllowNullOtherStreamOnLeftJoinWithJoined() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -884,7 +896,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullValueJoinerOnLeftJoinWithJoined() { public void shouldNotAllowNullValueJoinerOnLeftJoinWithJoined() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -917,7 +929,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullJoinWindowsOnLeftJoinWithJoined() { public void shouldNotAllowNullJoinWindowsOnLeftJoinWithJoined() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -942,7 +954,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullJoinedOnLeftJoin() { public void shouldNotAllowNullJoinedOnLeftJoin() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -975,7 +987,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("otherStream can't be null")); assertThat(exception.getMessage(), equalTo("otherStream can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullOtherStreamOnOuterJoinWithJoined() { public void shouldNotAllowNullOtherStreamOnOuterJoinWithJoined() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -1008,7 +1020,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullValueJoinerOnOuterJoinWithJoined() { public void shouldNotAllowNullValueJoinerOnOuterJoinWithJoined() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -1041,7 +1053,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullJoinWindowsOnOuterJoinWithJoined() { public void shouldNotAllowNullJoinWindowsOnOuterJoinWithJoined() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -1066,7 +1078,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("windows can't be null")); assertThat(exception.getMessage(), equalTo("windows can't be null"));
} }
@SuppressWarnings("deprecation") @Deprecated
@Test @Test
public void shouldNotAllowNullJoinedOnOuterJoin() { public void shouldNotAllowNullJoinedOnOuterJoin() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
@ -1291,7 +1303,7 @@ public class KStreamImplTest {
assertThat(exception.getMessage(), equalTo("joiner can't be null")); assertThat(exception.getMessage(), equalTo("joiner can't be null"));
} }
@SuppressWarnings("unchecked") @SuppressWarnings({"unchecked", "deprecation"}) // specifically testing the deprecated variant
@Test @Test
public void testNumProcesses() { public void testNumProcesses() {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
@ -1303,7 +1315,7 @@ public class KStreamImplTest {
final KStream<String, String> stream1 = source1.filter((key, value) -> true) final KStream<String, String> stream1 = source1.filter((key, value) -> true)
.filterNot((key, value) -> false); .filterNot((key, value) -> false);
final KStream<String, Integer> stream2 = stream1.mapValues(s -> Integer.valueOf(s)); final KStream<String, Integer> stream2 = stream1.mapValues((ValueMapper<String, Integer>) Integer::valueOf);
final KStream<String, Integer> stream3 = source2.flatMapValues((ValueMapper<String, Iterable<Integer>>) final KStream<String, Integer> stream3 = source2.flatMapValues((ValueMapper<String, Iterable<Integer>>)
value -> Collections.singletonList(Integer.valueOf(value))); value -> Collections.singletonList(Integer.valueOf(value)));
@ -1330,6 +1342,8 @@ public class KStreamImplTest {
streams2[1].through("topic-6").process(new MockProcessorSupplier<>()); streams2[1].through("topic-6").process(new MockProcessorSupplier<>());
streams2[1].repartition().process(new MockProcessorSupplier<>());
assertEquals(2 + // sources assertEquals(2 + // sources
2 + // stream1 2 + // stream1
1 + // stream2 1 + // stream2
@ -1339,11 +1353,13 @@ public class KStreamImplTest {
5 * 2 + // stream2-stream3 joins 5 * 2 + // stream2-stream3 joins
1 + // to 1 + // to
2 + // through 2 + // through
1 + // process
3 + // repartition
1, // process 1, // process
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology().processors().size()); TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology().processors().size());
} }
@SuppressWarnings("rawtypes") @SuppressWarnings({"rawtypes", "deprecation"}) // specifically testing the deprecated variant
@Test @Test
public void shouldPreserveSerdesForOperators() { public void shouldPreserveSerdesForOperators() {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
@ -1393,6 +1409,11 @@ public class KStreamImplTest {
assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).keySerde(), mySerde);
assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).valueSerde(), mySerde);
assertEquals(((AbstractStream) stream1.repartition()).keySerde(), consumedInternal.keySerde());
assertEquals(((AbstractStream) stream1.repartition()).valueSerde(), consumedInternal.valueSerde());
assertEquals(((AbstractStream) stream1.repartition(Repartitioned.with(mySerde, mySerde))).keySerde(), mySerde);
assertEquals(((AbstractStream) stream1.repartition(Repartitioned.with(mySerde, mySerde))).valueSerde(), mySerde);
assertEquals(((AbstractStream) stream1.groupByKey()).keySerde(), consumedInternal.keySerde()); assertEquals(((AbstractStream) stream1.groupByKey()).keySerde(), consumedInternal.keySerde());
assertEquals(((AbstractStream) stream1.groupByKey()).valueSerde(), consumedInternal.valueSerde()); assertEquals(((AbstractStream) stream1.groupByKey()).valueSerde(), consumedInternal.valueSerde());
assertEquals(((AbstractStream) stream1.groupByKey(Grouped.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals(((AbstractStream) stream1.groupByKey(Grouped.with(mySerde, mySerde))).keySerde(), mySerde);
@ -1435,6 +1456,7 @@ public class KStreamImplTest {
assertNull(((AbstractStream) stream1.leftJoin(table2, selector, joiner)).valueSerde()); assertNull(((AbstractStream) stream1.leftJoin(table2, selector, joiner)).valueSerde());
} }
@Deprecated
@Test @Test
public void shouldUseRecordMetadataTimestampExtractorWithThrough() { public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
@ -1452,6 +1474,24 @@ public class KStreamImplTest {
assertNull(processorTopology.source("topic-1").getTimestampExtractor()); assertNull(processorTopology.source("topic-1").getTimestampExtractor());
} }
@Test
public void shouldUseRecordMetadataTimestampExtractorWithRepartition() {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), stringConsumed);
final KStream<String, String> stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed);
stream1.to("topic-5");
stream2.repartition(Repartitioned.as("topic-6"));
final ProcessorTopology processorTopology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").buildTopology();
assertThat(processorTopology.source("X-topic-6-repartition").getTimestampExtractor(), instanceOf(FailOnInvalidTimestamp.class));
assertNull(processorTopology.source("topic-4").getTimestampExtractor());
assertNull(processorTopology.source("topic-3").getTimestampExtractor());
assertNull(processorTopology.source("topic-2").getTimestampExtractor());
assertNull(processorTopology.source("topic-1").getTimestampExtractor());
}
@Deprecated
@Test @Test
public void shouldSendDataThroughTopicUsingProduced() { public void shouldSendDataThroughTopicUsingProduced() {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
@ -1467,6 +1507,21 @@ public class KStreamImplTest {
assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0)))); assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0))));
} }
@Test
public void shouldSendDataThroughRepartitionTopicUsingRepartitioned() {
final StreamsBuilder builder = new StreamsBuilder();
final String input = "topic";
final KStream<String, String> stream = builder.stream(input, stringConsumed);
stream.repartition(Repartitioned.with(Serdes.String(), Serdes.String())).process(processorSupplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
inputTopic.pipeInput("a", "b");
}
assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0))));
}
@Test @Test
public void shouldSendDataToTopicUsingProduced() { public void shouldSendDataToTopicUsingProduced() {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();

View File

@ -194,9 +194,9 @@ public class StreamsGraphTest {
} }
// no need to optimize as user has already performed the repartitioning manually // no need to optimize as user has already performed the repartitioning manually
@Deprecated
@Test @Test
public void shouldNotOptimizeWhenAThroughOperationIsDone() { public void shouldNotOptimizeWhenAThroughOperationIsDone() {
final Topology attemptedOptimize = getTopologyWithThroughOperation(StreamsConfig.OPTIMIZE); final Topology attemptedOptimize = getTopologyWithThroughOperation(StreamsConfig.OPTIMIZE);
final Topology noOptimziation = getTopologyWithThroughOperation(StreamsConfig.NO_OPTIMIZATION); final Topology noOptimziation = getTopologyWithThroughOperation(StreamsConfig.NO_OPTIMIZATION);
@ -254,6 +254,7 @@ public class StreamsGraphTest {
} }
@Deprecated // specifically testing the deprecated variant
private Topology getTopologyWithThroughOperation(final String optimizeConfig) { private Topology getTopologyWithThroughOperation(final String optimizeConfig) {
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
@ -304,7 +305,7 @@ public class StreamsGraphTest {
return repartitionTopicsFound.size(); return repartitionTopicsFound.size();
} }
private String expectedJoinedTopology = "Topologies:\n" private final String expectedJoinedTopology = "Topologies:\n"
+ " Sub-topology: 0\n" + " Sub-topology: 0\n"
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+ " --> KSTREAM-WINDOWED-0000000002\n" + " --> KSTREAM-WINDOWED-0000000002\n"
@ -326,7 +327,7 @@ public class StreamsGraphTest {
+ " --> none\n" + " --> none\n"
+ " <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n"; + " <-- KSTREAM-JOINTHIS-0000000004, KSTREAM-JOINOTHER-0000000005\n\n";
private String expectedJoinedFilteredTopology = "Topologies:\n" private final String expectedJoinedFilteredTopology = "Topologies:\n"
+ " Sub-topology: 0\n" + " Sub-topology: 0\n"
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+ " --> KSTREAM-WINDOWED-0000000002\n" + " --> KSTREAM-WINDOWED-0000000002\n"
@ -351,7 +352,7 @@ public class StreamsGraphTest {
+ " --> none\n" + " --> none\n"
+ " <-- KSTREAM-MERGE-0000000006\n\n"; + " <-- KSTREAM-MERGE-0000000006\n\n";
private String expectedFullTopology = "Topologies:\n" private final String expectedFullTopology = "Topologies:\n"
+ " Sub-topology: 0\n" + " Sub-topology: 0\n"
+ " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
+ " --> KSTREAM-WINDOWED-0000000002\n" + " --> KSTREAM-WINDOWED-0000000002\n"
@ -382,7 +383,7 @@ public class StreamsGraphTest {
+ " <-- KSTREAM-MAPVALUES-0000000008\n\n"; + " <-- KSTREAM-MAPVALUES-0000000008\n\n";
private String expectedMergeOptimizedTopology = "Topologies:\n" + private final String expectedMergeOptimizedTopology = "Topologies:\n" +
" Sub-topology: 0\n" + " Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])\n" +
" --> KSTREAM-KEY-SELECT-0000000001\n" + " --> KSTREAM-KEY-SELECT-0000000001\n" +

View File

@ -132,7 +132,8 @@ public class EosTestClient extends SmokeTestUtil {
.to("sum", Produced.with(stringSerde, longSerde)); .to("sum", Produced.with(stringSerde, longSerde));
if (withRepartitioning) { if (withRepartitioning) {
final KStream<String, Integer> repartitionedData = data.through("repartition"); data.to("repartition");
final KStream<String, Integer> repartitionedData = builder.stream("repartition");
repartitionedData.process(SmokeTestUtil.printProcessorSupplier("repartition")); repartitionedData.process(SmokeTestUtil.printProcessorSupplier("repartition"));

View File

@ -95,6 +95,9 @@ object ImplicitConversions {
implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] = implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] =
Produced.`with`[K, V] Produced.`with`[K, V]
implicit def repartitionedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Repartitioned[K, V] =
Repartitioned.`with`[K, V]
implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K], implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
valueSerde: Serde[V], valueSerde: Serde[V],
otherValueSerde: Serde[VO]): StreamJoined[K, V, VO] = otherValueSerde: Serde[VO]): StreamJoined[K, V, VO] =

View File

@ -218,7 +218,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* import Serdes._ * import Serdes._
* *
* //.. * //..
* val clicksPerRegion: KTable[String, Long] = //.. * val clicksPerRegion: KStream[String, Long] = //..
* *
* // Implicit serdes in scope will generate an implicit Produced instance, which * // Implicit serdes in scope will generate an implicit Produced instance, which
* // will be passed automatically to the call of through below * // will be passed automatically to the call of through below
@ -232,10 +232,53 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @param produced the instance of Produced that gives the serdes and `StreamPartitioner` * @param produced the instance of Produced that gives the serdes and `StreamPartitioner`
* @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]] * @return a [[KStream]] that contains the exact same (and potentially repartitioned) records as this [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#through` * @see `org.apache.kafka.streams.kstream.KStream#through`
* @deprecated use `repartition()` instead
*/ */
@deprecated("use `repartition()` instead", "2.6.0")
def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] = def through(topic: String)(implicit produced: Produced[K, V]): KStream[K, V] =
new KStream(inner.through(topic, produced)) new KStream(inner.through(topic, produced))
/**
* Materialize this stream to a topic and creates a new [[KStream]] from the topic using the `Repartitioned` instance
* for configuration of the `Serde key serde`, `Serde value serde`, `StreamPartitioner`, number of partitions, and
* topic name part.
* <p>
* The created topic is considered as an internal topic and is meant to be used only by the current Kafka Streams instance.
* Similar to auto-repartitioning, the topic will be created with infinite retention time and data will be automatically purged by Kafka Streams.
* The topic will be named as "${applicationId}-&lt;name&gt;-repartition", where "applicationId" is user-specified in
* `StreamsConfig` via parameter APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG`,
* "&lt;name&gt;" is either provided via `Repartitioned#as(String)` or an internally
* generated name, and "-repartition" is a fixed suffix.
* <p>
* The user can either supply the `Repartitioned` instance as an implicit in scope or she can also provide implicit
* key and value serdes that will be converted to a `Repartitioned` instance implicitly.
* <p>
* {{{
* Example:
*
* // brings implicit serdes in scope
* import Serdes._
*
* //..
* val clicksPerRegion: KStream[String, Long] = //..
*
* // Implicit serdes in scope will generate an implicit Produced instance, which
* // will be passed automatically to the call of through below
* clicksPerRegion.repartition
*
* // Similarly you can create an implicit Repartitioned and it will be passed implicitly
* // to the repartition call
* }}}
*
* @param repartitioned the `Repartitioned` instance used to specify `Serdes`, `StreamPartitioner`` which determines
* how records are distributed among partitions of the topic,
* part of the topic name, and number of partitions for a repartition topic.
* @return a [[KStream]] that contains the exact same repartitioned records as this [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#repartition`
*/
def repartition(implicit repartitioned: Repartitioned[K, V]): KStream[K, V] =
new KStream(inner.repartition(repartitioned))
/** /**
* Materialize this stream to a topic using the `Produced` instance for * Materialize this stream to a topic using the `Produced` instance for
* configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner` * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Repartitioned => RepartitionedJ}
import org.apache.kafka.streams.processor.StreamPartitioner
object Repartitioned {
/**
* Create a Repartitioned instance with provided keySerde and valueSerde.
*
* @tparam K key type
* @tparam V value type
* @param keySerde Serde to use for serializing the key
* @param valueSerde Serde to use for serializing the value
* @return A new [[Repartitioned]] instance configured with keySerde and valueSerde
* @see KStream#repartition(Repartitioned)
*/
def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): RepartitionedJ[K, V] =
RepartitionedJ.`with`(keySerde, valueSerde)
/**
* Create a Repartitioned instance with provided keySerde, valueSerde, and name used as part of the repartition topic.
*
* @tparam K key type
* @tparam V value type
* @param name the name used as a processor named and part of the repartition topic name.
* @param keySerde Serde to use for serializing the key
* @param valueSerde Serde to use for serializing the value
* @return A new [[Repartitioned]] instance configured with keySerde, valueSerde, and processor and repartition topic name
* @see KStream#repartition(Repartitioned)
*/
def `with`[K, V](name: String)(implicit keySerde: Serde[K], valueSerde: Serde[V]): RepartitionedJ[K, V] =
RepartitionedJ.`as`(name).withKeySerde(keySerde).withValueSerde(valueSerde)
/**
* Create a Repartitioned instance with provided keySerde, valueSerde, and partitioner.
*
* @tparam K key type
* @tparam V value type
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified and `keySerde` provides a
* [[org.apache.kafka.streams.kstream.internals.WindowedSerializer]] for the key
* [[org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner]] will be
* used&mdash;otherwise [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]
* will be used
* @param keySerde Serde to use for serializing the key
* @param valueSerde Serde to use for serializing the value
* @return A new [[Repartitioned]] instance configured with keySerde, valueSerde, and partitioner
* @see KStream#repartition(Repartitioned)
*/
def `with`[K, V](partitioner: StreamPartitioner[K, V])(implicit keySerde: Serde[K],
valueSerde: Serde[V]): RepartitionedJ[K, V] =
RepartitionedJ.`streamPartitioner`(partitioner).withKeySerde(keySerde).withValueSerde(valueSerde)
/**
* Create a Repartitioned instance with provided keySerde, valueSerde, and number of partitions for repartition topic.
*
* @tparam K key type
* @tparam V value type
* @param numberOfPartitions number of partitions used when creating repartition topic
* @param keySerde Serde to use for serializing the key
* @param valueSerde Serde to use for serializing the value
* @return A new [[Repartitioned]] instance configured with keySerde, valueSerde, and number of partitions
* @see KStream#repartition(Repartitioned)
*/
def `with`[K, V](numberOfPartitions: Int)(implicit keySerde: Serde[K], valueSerde: Serde[V]): RepartitionedJ[K, V] =
RepartitionedJ.`numberOfPartitions`(numberOfPartitions).withKeySerde(keySerde).withValueSerde(valueSerde)
}

View File

@ -23,6 +23,7 @@ package object kstream {
type Grouped[K, V] = org.apache.kafka.streams.kstream.Grouped[K, V] type Grouped[K, V] = org.apache.kafka.streams.kstream.Grouped[K, V]
type Consumed[K, V] = org.apache.kafka.streams.kstream.Consumed[K, V] type Consumed[K, V] = org.apache.kafka.streams.kstream.Consumed[K, V]
type Produced[K, V] = org.apache.kafka.streams.kstream.Produced[K, V] type Produced[K, V] = org.apache.kafka.streams.kstream.Produced[K, V]
type Repartitioned[K, V] = org.apache.kafka.streams.kstream.Repartitioned[K, V]
type Joined[K, V, VO] = org.apache.kafka.streams.kstream.Joined[K, V, VO] type Joined[K, V, VO] = org.apache.kafka.streams.kstream.Joined[K, V, VO]
type StreamJoined[K, V, VO] = org.apache.kafka.streams.kstream.StreamJoined[K, V, VO] type StreamJoined[K, V, VO] = org.apache.kafka.streams.kstream.StreamJoined[K, V, VO]
} }

View File

@ -156,6 +156,36 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testDriver.close() testDriver.close()
} }
"repartition" should "repartition a KStream" in {
val builder = new StreamsBuilder()
val sourceTopic = "source"
val repartitionName = "repartition"
val sinkTopic = "sink"
builder.stream[String, String](sourceTopic).repartition(Repartitioned.`with`(repartitionName)).to(sinkTopic)
val testDriver = createTestDriver(builder)
val testInput = testDriver.createInput[String, String](sourceTopic)
val testOutput = testDriver.createOutput[String, String](sinkTopic)
testInput.pipeInput("1", "value1")
val kv1 = testOutput.readKeyValue
kv1.key shouldBe "1"
kv1.value shouldBe "value1"
testInput.pipeInput("2", "value2")
val kv2 = testOutput.readKeyValue
kv2.key shouldBe "2"
kv2.value shouldBe "value2"
testOutput.isEmpty shouldBe true
// appId == "test"
testDriver.producedTopicNames() contains "test-" + repartitionName + "-repartition"
testDriver.close()
}
"join 2 KStreams" should "join correctly records" in { "join 2 KStreams" should "join correctly records" in {
val builder = new StreamsBuilder() val builder = new StreamsBuilder()
val sourceTopic1 = "source1" val sourceTopic1 = "source1"

View File

@ -37,15 +37,15 @@ class ProducedTest extends FlatSpec with Matchers {
internalProduced.valueSerde.getClass shouldBe Serdes.Long.getClass internalProduced.valueSerde.getClass shouldBe Serdes.Long.getClass
} }
"Create a Produced with timestampExtractor and resetPolicy" should "create a Consumed with Serdes, timestampExtractor and resetPolicy" in { "Create a Produced with streamPartitioner" should "create a Produced with Serdes and streamPartitioner" in {
val partitioner = new StreamPartitioner[String, Long] { val partitioner = new StreamPartitioner[String, Long] {
override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0 override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0
} }
val produced: Produced[String, Long] = Produced.`with`(partitioner) val produced: Produced[String, Long] = Produced.`with`(partitioner)
val internalConsumed = new ProducedInternal(produced) val internalProduced = new ProducedInternal(produced)
internalConsumed.keySerde.getClass shouldBe Serdes.String.getClass internalProduced.keySerde.getClass shouldBe Serdes.String.getClass
internalConsumed.valueSerde.getClass shouldBe Serdes.Long.getClass internalProduced.valueSerde.getClass shouldBe Serdes.Long.getClass
internalConsumed.streamPartitioner shouldBe partitioner internalProduced.streamPartitioner shouldBe partitioner
} }
} }

View File

@ -0,0 +1,89 @@
/*
* Copyright (C) 2018 Joan Goyeau.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.internals.RepartitionedInternal
import org.apache.kafka.streams.processor.StreamPartitioner
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.Serdes
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class RepartitionedTest extends FlatSpec with Matchers {
"Create a Repartitioned" should "create a Repartitioned with Serdes" in {
val repartitioned: Repartitioned[String, Long] = Repartitioned.`with`[String, Long]
val internalRepartitioned = new RepartitionedInternal(repartitioned)
internalRepartitioned.keySerde.getClass shouldBe Serdes.String.getClass
internalRepartitioned.valueSerde.getClass shouldBe Serdes.Long.getClass
}
"Create a Repartitioned with numPartitions" should "create a Repartitioned with Serdes and numPartitions" in {
val repartitioned: Repartitioned[String, Long] = Repartitioned.`with`[String, Long](5)
val internalRepartitioned = new RepartitionedInternal(repartitioned)
internalRepartitioned.keySerde.getClass shouldBe Serdes.String.getClass
internalRepartitioned.valueSerde.getClass shouldBe Serdes.Long.getClass
internalRepartitioned.numberOfPartitions shouldBe 5
}
"Create a Repartitioned with topicName" should "create a Repartitioned with Serdes and topicName" in {
val repartitioned: Repartitioned[String, Long] = Repartitioned.`with`[String, Long]("repartitionTopic")
val internalRepartitioned = new RepartitionedInternal(repartitioned)
internalRepartitioned.keySerde.getClass shouldBe Serdes.String.getClass
internalRepartitioned.valueSerde.getClass shouldBe Serdes.Long.getClass
internalRepartitioned.name shouldBe "repartitionTopic"
}
"Create a Repartitioned with streamPartitioner" should "create a Repartitioned with Serdes, numPartitions, topicName and streamPartitioner" in {
val partitioner = new StreamPartitioner[String, Long] {
override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0
}
val repartitioned: Repartitioned[String, Long] = Repartitioned.`with`[String, Long](partitioner)
val internalRepartitioned = new RepartitionedInternal(repartitioned)
internalRepartitioned.keySerde.getClass shouldBe Serdes.String.getClass
internalRepartitioned.valueSerde.getClass shouldBe Serdes.Long.getClass
internalRepartitioned.streamPartitioner shouldBe partitioner
}
"Create a Repartitioned with numPartitions, topicName, and streamPartitioner" should "create a Repartitioned with Serdes, numPartitions, topicName and streamPartitioner" in {
val partitioner = new StreamPartitioner[String, Long] {
override def partition(topic: String, key: String, value: Long, numPartitions: Int): Integer = 0
}
val repartitioned: Repartitioned[String, Long] =
Repartitioned
.`with`[String, Long](5)
.withName("repartitionTopic")
.withStreamPartitioner(partitioner)
val internalRepartitioned = new RepartitionedInternal(repartitioned)
internalRepartitioned.keySerde.getClass shouldBe Serdes.String.getClass
internalRepartitioned.valueSerde.getClass shouldBe Serdes.Long.getClass
internalRepartitioned.numberOfPartitions shouldBe 5
internalRepartitioned.name shouldBe "repartitionTopic"
internalRepartitioned.streamPartitioner shouldBe partitioner
}
}