KAFKA-9466: Update Kafka Streams docs for KIP-447 (#8621)

Reviewers: Boyang Chen <boyang@confluent.io>, Jim Galasyn <jim.galasyn@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
Matthias J. Sax 2020-05-11 19:11:58 -07:00 committed by GitHub
parent 6d0e722f4d
commit 318063a16a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 14 deletions

View File

@ -206,17 +206,26 @@
to the stream processing pipeline, known as the <a href="http://lambda-architecture.net/">Lambda Architecture</a>.
Prior to 0.11.0.0, Kafka only provides at-least-once delivery guarantees and hence any stream processing systems that leverage it as the backend storage could not guarantee end-to-end exactly-once semantics.
In fact, even for those stream processing systems that claim to support exactly-once processing, as long as they are reading from / writing to Kafka as the source / sink, their applications cannot actually guarantee that
no duplicates will be generated throughout the pipeline.
no duplicates will be generated throughout the pipeline.<br />
Since the 0.11.0.0 release, Kafka has added support to allow its producers to send messages to different topic partitions in a <a href="https://kafka.apache.org/documentation/#semantics">transactional and idempotent manner</a>,
and Kafka Streams has hence added the end-to-end exactly-once processing semantics by leveraging these features.
More specifically, it guarantees that for any record read from the source Kafka topics, its processing results will be reflected exactly once in the output Kafka topic as well as in the state stores for stateful operations.
Note the key difference between Kafka Streams end-to-end exactly-once guarantee with other stream processing frameworks' claimed guarantees is that Kafka Streams tightly integrates with the underlying Kafka storage system and ensure that
commits on the input topic offsets, updates on the state stores, and writes to the output topics will be completed atomically instead of treating Kafka as an external system that may have side-effects.
To read more details on how this is done inside Kafka Streams, readers are recommended to read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics">KIP-129</a>.
For more information on how this is done inside Kafka Streams, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics">KIP-129</a>.<br />
In order to achieve exactly-once semantics when running Kafka Streams applications, users can simply set the <code>processing.guarantee</code> config value to <b>exactly_once</b> (default value is <b>at_least_once</b>).
More details can be found in the <a href="/{{version}}/documentation#streamsconfigs"><b>Kafka Streams Configs</b></a> section.
As of the 2.6.0 release, Kafka Streams supports an improved implementation of exactly-once processing, named "exactly-once beta",
which requires broker version 2.5.0 or newer.
This implementation is more efficient, because it reduces client and broker resource utilization, like client threads and used network connections,
and it enables higher throughput and improved scalability.
For more information on how this is done inside the brokers and Kafka Streams, see
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics">KIP-447</a>.<br />
To enable exactly-once semantics when running Kafka Streams applications,
set the <code>processing.guarantee</code> config value (default value is <b>at_least_once</b>)
to <b>exactly_once</b> (requires brokers version 0.11.0 or newer) or <b>exactly_once_beta</b> (requires brokers version 2.5 or newer).
For more information, see the <a href="/{{version}}/documentation/streams/developer-guide/config-streams.html">Kafka Streams Configs</a> section.
</p>
<h3><a id="streams_out_of_ordering" href="#streams_out_of_ordering">Out-of-Order Handling</a></h3>

View File

@ -245,8 +245,9 @@
<td>See <a class="reference internal" href="#streams-developer-guide-partition-grouper"><span class="std std-ref">Partition Grouper</span></a></td>
</tr>
<tr class="row-even"><td>processing.guarantee</td>
<td>Low</td>
<td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default) or <code class="docutils literal"><span class="pre">"exactly_once"</span></code>.
<td>Medium</td>
<td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default),
<code class="docutils literal"><span class="pre">"exactly_once"</span></code>, or <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code>.
<td>See <a class="reference internal" href="#streams-developer-guide-processing-guarantedd"><span class="std std-ref">Processing Guarantee</span></a></td>
</tr>
<tr class="row-odd"><td>poll.ms</td>
@ -456,13 +457,22 @@
<div class="section" id="processing-guarantee">
<span id="streams-developer-guide-processing-guarantee"></span><h4><a class="toc-backref" href="#id25">processing.guarantee</a><a class="headerlink" href="#processing-guarantee" title="Permalink to this headline"></a></h4>
<blockquote>
<div>The processing guarantee that should be used. Possible values are <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default) and <code class="docutils literal"><span class="pre">"exactly_once"</span></code>.
Note that if exactly-once processing is enabled, the default for parameter <code class="docutils literal"><span class="pre">commit.interval.ms</span></code> changes to 100ms.
<div>The processing guarantee that should be used.
Possible values are <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default),
<code class="docutils literal"><span class="pre">"exactly_once"</span></code>,
and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code>.
Using <code class="docutils literal"><span class="pre">"exactly_once"</span></code> requires broker
version 0.11.0 or newer, while using <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code>
requires broker version 2.5 or newer.
Note that if exactly-once processing is enabled, the default for parameter
<code class="docutils literal"><span class="pre">commit.interval.ms</span></code> changes to 100ms.
Additionally, consumers are configured with <code class="docutils literal"><span class="pre">isolation.level="read_committed"</span></code>
and producers are configured with <code class="docutils literal"><span class="pre">retries=Integer.MAX_VALUE</span></code>, <code class="docutils literal"><span class="pre">enable.idempotence=true</span></code>,
and <code class="docutils literal"><span class="pre">max.in.flight.requests.per.connection=1</span></code> per default.
and producers are configured with <code class="docutils literal"><span class="pre">enable.idempotence=true</span></code> per default.
Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production.
For development you can change this, by adjusting broker setting <code class="docutils literal"><span class="pre">transaction.state.log.replication.factor</span></code> and <code class="docutils literal"><span class="pre">transaction.state.log.min.isr</span></code> to the number of broker you want to use.
For development, you can change this configuration by adjusting broker setting
<code class="docutils literal"><span class="pre">transaction.state.log.replication.factor</span></code>
and <code class="docutils literal"><span class="pre">transaction.state.log.min.isr</span></code>
to the number of brokers you want to use.
For more details see <a href="../core-concepts#streams_processing_guarantee">Processing Guarantees</a>.
</div></blockquote>
</div>

View File

@ -52,6 +52,20 @@
<li> restart all new ({{fullDotVersion}}) application instances </li>
</ul>
<p>
Starting in Kafka Streams 2.6.x, a new processing mode <code>"exactly_once_beta"</code> (configurable via parameter
<code>processing.guarantee</code>) is available.
To use this new feature, your brokers must be on version 2.5.x or newer.
A switch from <code>"exactly_once"</code> to <code>"exactly_once_beta"</code> (or the other way around) is
only possible if the application is on version 2.6.x.
If you want to upgrade your application from an older version and enable this feature,
you first need to upgrade your application to version 2.6.x, staying on <code>"exactly_once"</code>,
and then do second round of rolling bounces to switch to <code>"exactly_once_beta"</code>.
For a downgrade, do the reverse: first switch the config from <code>"exactly_once_beta"</code> to
<code>"exactly_once"</code> to disable the feature in your 2.6.x application.
Afterward, you can downgrade your application to a pre-2.6.x version.
</p>
<p>
To run a Kafka Streams application version 2.2.1, 2.3.0, or higher a broker version 0.11.0 or higher is required
and the on-disk message format must be 0.11 or higher.
@ -72,6 +86,15 @@
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>
<h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>
<p>
We added a new processing mode that improves application scalability using exactly-once guarantees
(via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics">KIP-447</a>).
You can enable this new feature by setting the configuration parameter <code>processing.guarantee</code> to the
new value <code>"exactly_once_beta"</code>.
Note that you need brokers with version 2.5 or newer to use this feature.
</p>
<h3><a id="streams_api_changes_250" href="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>
<p>
We add a new <code>cogroup()</code> operator (via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup">KIP-150</a>>)
@ -761,16 +784,19 @@
<p> Metrics using exactly-once semantics: </p>
<p>
If exactly-once processing is enabled via the <code>processing.guarantees</code> parameter, internally Streams switches from a producer per thread to a producer per task runtime model.
If <code>"exactly_once"</code> processing is enabled via the <code>processing.guarantee</code> parameter,
internally Streams switches from a producer-per-thread to a producer-per-task runtime model.
Using <code>"exactly_once_beta"</code> does use a producer-per-thread, so <code>client.id</code> doesn't change,
compared with <code>"at_least_once"</code> for this case).
In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case.
Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics.
</p>
<p> Producer's <code>client.id</code> naming schema: </p>
<ul>
<li> at-least-once (default): <code>[client.Id]-StreamThread-[sequence-number]</code> </li>
<li> exactly-once: <code>[client.Id]-StreamThread-[sequence-number]-[taskId]</code> </li>
<li> exactly-once-beta: <code>[client.Id]-StreamThread-[sequence-number]</code> </li>
</ul>
<p> <code>[client.Id]</code> is either set via Streams configuration parameter <code>client.id</code> or defaults to <code>[application.id]-[processId]</code> (<code>[processId]</code> is a random UUID). </p>

View File

@ -19,6 +19,13 @@
<script id="upgrade-template" type="text/x-handlebars-template">
<h5><a id="upgrade_260_notable" href="#upgrade_260_notable">Notable changes in 2.6.0</a></h5>
<ul>
<li>Kafka Streams adds a new processing mode (requires broker 2.5 or newer) that improves application
scalability using exactly-once guarantees
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics">KIP-447</a></li>
</ul>
<h5><a id="upgrade_250_notable" href="#upgrade_250_notable">Notable changes in 2.5.0</a></h5>
<ul>
<li>When <code>RebalanceProtocol#COOPERATIVE</code> is used, <code>Consumer#poll</code> can still return data

View File

@ -464,7 +464,8 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee";
private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. " +
"Possible values are <code>" + AT_LEAST_ONCE + "</code> (default), <code>" + EXACTLY_ONCE + "</code>, " +
"Possible values are <code>" + AT_LEAST_ONCE + "</code> (default), " +
"<code>" + EXACTLY_ONCE + "</code> (requires brokers version 0.11.0 or higher), " +
"and <code>" + EXACTLY_ONCE_BETA + "</code> (requires brokers version 2.5 or higher). " +
"Note that exactly-once processing requires a cluster of at least three brokers by default what is the " +
"recommended setting for production; for development you can change this, by adjusting broker setting " +