MINOR: update Kafka Streams standby task config (#11404)

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Antony Stubbs <antony@confluent.io>, James Galasyn <jim.galasyn@confluent.io>
This commit is contained in:
Matthias J. Sax 2021-11-16 17:34:49 -08:00 committed by GitHub
parent d37aaf68de
commit 30d1989db1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 127 additions and 106 deletions

View File

@ -55,43 +55,47 @@ settings.put(... , ...);</code></pre>
<p>This section contains the most common Streams configuration parameters. For a full reference, see the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/StreamsConfig.html">Streams</a> Javadocs.</p>
<div class="contents local topic" id="contents">
<ul class="simple">
<li><a class="reference internal" href="#required-configuration-parameters" id="id3">Required configuration parameters</a><ul>
<li><a class="reference internal" href="#application-id" id="id4">application.id</a></li>
<li><a class="reference internal" href="#bootstrap-servers" id="id5">bootstrap.servers</a></li>
</ul>
<li><a class="reference internal" href="#required-configuration-parameters" id="id3">Required configuration parameters</a>
<ul>
<li><a class="reference internal" href="#application-id" id="id4">application.id</a></li>
<li><a class="reference internal" href="#bootstrap-servers" id="id5">bootstrap.servers</a></li>
</ul>
</li>
<li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a><ul>
<li><a class="reference internal" href="#acceptable-recovery-lag" id="id27">acceptable.recovery.lag</a></li>
<li><a class="reference internal" href="#default-deserialization-exception-handler" id="id7">default.deserialization.exception.handler</a></li>
<li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
<li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li>
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
<li><a class="reference internal" href="#default-windowed-key-serde-inner" id="id32">default.windowed.key.serde.inner</a></li>
<li><a class="reference internal" href="#default-windowed-value-serde-inner" id="id33">default.windowed.value.serde.inner</a></li>
<li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li>
<li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li>
<li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li>
<li><a class="reference internal" href="#num-stream-threads" id="id11">num.stream.threads</a></li>
<li><a class="reference internal" href="#partition-grouper" id="id12">partition.grouper</a></li>
<li><a class="reference internal" href="#probing-rebalance-interval-ms" id="id30">probing.rebalance.interval.ms</a></li>
<li><a class="reference internal" href="#processing-guarantee" id="id25">processing.guarantee</a></li>
<li><a class="reference internal" href="#replication-factor" id="id13">replication.factor</a></li>
<li><a class="reference internal" href="#rocksdb-config-setter" id="id20">rocksdb.config.setter</a></li>
<li><a class="reference internal" href="#state-dir" id="id14">state.dir</a></li>
<li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
</ul>
<li><a class="reference internal" href="#recommended-configuration-parameters-for-resiliency" id="id21">Recommended configuration parameters for resiliency</a>
<ul>
<li><a class="reference internal" href="#acks" id="id22">acks</a></li>
<li><a class="reference internal" href="#id2" id="id23">replication.factor</a></li>
</ul>
</li>
<li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a><ul>
<li><a class="reference internal" href="#naming" id="id17">Naming</a></li>
<li><a class="reference internal" href="#default-values" id="id18">Default Values</a></li>
<li><a class="reference internal" href="#enable-auto-commit" id="id19">enable.auto.commit</a></li>
</ul>
<li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a>
<ul>
<li><a class="reference internal" href="#acceptable-recovery-lag" id="id27">acceptable.recovery.lag</a></li>
<li><a class="reference internal" href="#default-deserialization-exception-handler" id="id7">default.deserialization.exception.handler</a></li>
<li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
<li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li>
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
<li><a class="reference internal" href="#default-windowed-key-serde-inner" id="id32">default.windowed.key.serde.inner</a></li>
<li><a class="reference internal" href="#default-windowed-value-serde-inner" id="id33">default.windowed.value.serde.inner</a></li>
<li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li>
<li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li>
<li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li>
<li><a class="reference internal" href="#num-stream-threads" id="id11">num.stream.threads</a></li>
<li><a class="reference internal" href="#partition-grouper" id="id12">partition.grouper</a></li>
<li><a class="reference internal" href="#probing-rebalance-interval-ms" id="id30">probing.rebalance.interval.ms</a></li>
<li><a class="reference internal" href="#processing-guarantee" id="id25">processing.guarantee</a></li>
<li><a class="reference internal" href="#replication-factor" id="id13">replication.factor</a></li>
<li><a class="reference internal" href="#rocksdb-config-setter" id="id20">rocksdb.config.setter</a></li>
<li><a class="reference internal" href="#state-dir" id="id14">state.dir</a></li>
<li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
</ul>
</li>
<li><a class="reference internal" href="#recommended-configuration-parameters-for-resiliency" id="id21">Recommended configuration parameters for resiliency</a><ul>
<li><a class="reference internal" href="#acks" id="id22">acks</a></li>
<li><a class="reference internal" href="#id2" id="id23">replication.factor</a></li>
</ul>
<li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a>
<ul>
<li><a class="reference internal" href="#naming" id="id17">Naming</a></li>
<li><a class="reference internal" href="#default-values" id="id18">Default Values</a></li>
<li><a class="reference internal" href="#enable-auto-commit" id="id19">enable.auto.commit</a></li>
</ul>
</li>
</ul>
</div>
@ -146,6 +150,74 @@ settings.put(... , ...);</code></pre>
</div></blockquote>
</div>
</div>
<div class="section" id="recommended-configuration-parameters-for-resiliency">
<h3><a class="toc-backref" href="#id21">Recommended configuration parameters for resiliency</a><a class="headerlink" href="#recommended-configuration-parameters-for-resiliency" title="Permalink to this headline"></a></h3>
<p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>
<table border="1" class="non-scrolling-table docutils">
<thead valign="bottom">
<tr class="row-odd"><th class="head">Parameter Name</th>
<th class="head">Corresponding Client</th>
<th class="head">Default value</th>
<th class="head">Consider setting to</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>acks</td>
<td>Producer</td>
<td><code class="docutils literal"><span class="pre">acks=1</span></code></td>
<td><code class="docutils literal"><span class="pre">acks=all</span></code></td>
</tr>
<tr class="row-odd"><td>replication.factor (for broker version 2.3 or older)/td>
<td>Streams</td>
<td><code class="docutils literal"><span class="pre">-1</span></code></td>
<td><code class="docutils literal"><span class="pre">3</span></code></td>
</tr>
<tr class="row-even"><td>min.insync.replicas</td>
<td>Broker</td>
<td><code class="docutils literal"><span class="pre">1</span></code></td>
<td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr>
<tr class="row-odd"><td>num.standby.replicas</td>
<td>Streams</td>
<td><code class="docutils literal"><span class="pre">0</span></code></td>
<td><code class="docutils literal"><span class="pre">1</span></code></td>
</tr>
</tbody>
</table>
<p>Increasing the replication factor to 3 ensures that the internal Kafka Streams topic can tolerate up to 2 broker failures. Changing the acks setting to &#8220;all&#8221;
guarantees that a record will not be lost as long as one replica is alive. The tradeoff from moving to the default values to the recommended ones is
that some performance and more storage space (3x with the replication factor of 3) are sacrificed for more resiliency.</p>
<div class="section" id="acks">
<h4><a class="toc-backref" href="#id22">acks</a><a class="headerlink" href="#acks" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The number of acknowledgments that the leader must have received before considering a request complete. This controls
the durability of records that are sent. The possible values are:</p>
<ul class="simple">
<li><code class="docutils literal"><span class="pre">acks=0</span></code> The producer does not wait for acknowledgment from the server and the record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the <code class="docutils literal"><span class="pre">retries</span></code> configuration will not take effect (as the client won&#8217;t generally know of any failures). The offset returned for each record will always be set to <code class="docutils literal"><span class="pre">-1</span></code>.</li>
<li><code class="docutils literal"><span class="pre">acks=1</span></code> The leader writes the record to its local log and responds without waiting for full acknowledgement from all followers. If the leader immediately fails after acknowledging the record, but before the followers have replicated it, then the record will be lost.</li>
<li><code class="docutils literal"><span class="pre">acks=all</span></code> The leader waits for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost if there is at least one in-sync replica alive. This is the strongest available guarantee.</li>
</ul>
<p>For more information, see the <a class="reference external" href="https://kafka.apache.org/documentation/#producerconfigs">Kafka Producer documentation</a>.</p>
</div></blockquote>
</div>
<div class="section" id="id2">
<h4><a class="toc-backref" href="#id23">replication.factor</a><a class="headerlink" href="#id2" title="Permalink to this headline"></a></h4>
<blockquote>
<div>See the <a class="reference internal" href="#replication-factor-parm"><span class="std std-ref">description here</span></a>.</div></blockquote>
</div>
<div class="section" id="i32">
<h4><a class="toc-backref" href="#id23">num.standby.replicas</a><a class="headerlink" href="#id2" title="Permalink to this headline"></a></h4>
<blockquote>
<div>See the <a class="reference internal" href="#streams-developer-guide-standby-replicas"><span class="std std-ref">description here</span></a>.</div></blockquote>
</div>
<pre class="line-numbers"><code class="language-java">Properties streamsSettings = new Properties();
// for broker version 2.3 or older
//streamsSettings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsSettings.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2);
streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1);</code></pre>
</div>
<div class="section" id="optional-configuration-parameters">
<span id="streams-developer-guide-optional-configs"></span><h3><a class="toc-backref" href="#id6">Optional configuration parameters</a><a class="headerlink" href="#optional-configuration-parameters" title="Permalink to this headline"></a></h3>
<p>Here are the optional <a href="/{{version}}/javadoc/org/apache/kafka/streams/StreamsConfig.html">Streams</a> javadocs, sorted by level of importance:</p>
@ -280,7 +352,7 @@ settings.put(... , ...);</code></pre>
<td>30000 milliseconds (30 seconds)</td>
</tr>
<tr class="row-even"><td>num.standby.replicas</td>
<td>Medium</td>
<td>High</td>
<td colspan="2">The number of standby replicas for each task.</td>
<td>0</td>
</tr>
@ -680,14 +752,19 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
preferred to restart on an instance that has standby replicas so that the local state store restoration process from its
changelog can be minimized. Details about how Kafka Streams makes use of the standby replicas to minimize the cost of
resuming tasks on failover can be found in the <a class="reference internal" href="../architecture.html#streams_architecture_state"><span class="std std-ref">State</span></a> section.
<dl class="docutils">
<dt>Recommendation:</dt>
<dd>Increase the number of standbys to 1 to get instant fail-over, i.e., high-availability.
Increasing the number of standbys requires more client-side storage space.
For example, with 1 standby, 2x space is required.</dd>
</dl>
<dl class="docutils">
<dt>Note:</dt>
<dd>If you enable <cite>n</cite> standby tasks, you need to provision <cite>n+1</cite> <code class="docutils literal"><span class="pre">KafkaStreams</span></code> instances.</dd>
</dl>
</div>
</blockquote>
</div>
<div class="admonition note">
<p class="first admonition-title">Note</p>
<p class="last">If you enable <cite>n</cite> standby tasks, you need to provision <cite>n+1</cite> <code class="docutils literal"><span class="pre">KafkaStreams</span></code>
instances.</p>
</div>
<div class="section" id="num-stream-threads">
<h4><a class="toc-backref" href="#id11">num.stream.threads</a><a class="headerlink" href="#num-stream-threads" title="Permalink to this headline"></a></h4>
<blockquote>
@ -755,7 +832,7 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
<dl class="docutils">
<dt>Recommendation:</dt>
<dd>Increase the replication factor to 3 to ensure that the internal Kafka Streams topic can tolerate up to 2 broker failures.
Note that you will require more storage space as well (3 times more with the replication factor of 3).</dd>
Note that you will require more storage space as well (3x with the replication factor of 3).</dd>
</dl>
</div></blockquote>
</div>
@ -953,19 +1030,19 @@ streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");
<tbody valign="top">
<tr class="row-odd"><td>allow.auto.create.topics</td>
<td>Consumer</td>
<td>false</td>
<td><code class="docutils literal"><span class="pre">false</span></code></td>
</tr>
<tr class="row-even"><td>auto.offset.reset</td>
<td>Consumer</td>
<td>earliest</td>
<td><code class="docutils literal"><span class="pre">earliest</span></code></td>
</tr>
<tr class="row-odd"><td>linger.ms</td>
<td>Producer</td>
<td>100</td>
<td><code class="docutils literal"><span class="pre">100</span></code></td>
</tr>
<tr class="row-odd"><td>max.poll.records</td>
<td>Consumer</td>
<td>1000</td>
<td><code class="docutils literal"><span class="pre">1000</span></code></td>
</tr>
</tbody>
</table>
@ -976,62 +1053,6 @@ streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");
value to <code class="docutils literal"><span class="pre">false</span></code>. Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides
to commit the current processing state.</div></blockquote>
</div>
<div class="section" id="recommended-configuration-parameters-for-resiliency">
<h3><a class="toc-backref" href="#id21">Recommended configuration parameters for resiliency</a><a class="headerlink" href="#recommended-configuration-parameters-for-resiliency" title="Permalink to this headline"></a></h3>
<p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>
<table border="1" class="non-scrolling-table docutils">
<thead valign="bottom">
<tr class="row-odd"><th class="head">Parameter Name</th>
<th class="head">Corresponding Client</th>
<th class="head">Default value</th>
<th class="head">Consider setting to</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>acks</td>
<td>Producer</td>
<td><code class="docutils literal"><span class="pre">acks=1</span></code></td>
<td><code class="docutils literal"><span class="pre">acks=all</span></code></td>
</tr>
<tr class="row-odd"><td>replication.factor</td>
<td>Streams</td>
<td><code class="docutils literal"><span class="pre">1</span></code></td>
<td><code class="docutils literal"><span class="pre">3</span></code></td>
</tr>
<tr class="row-even"><td>min.insync.replicas</td>
<td>Broker</td>
<td><code class="docutils literal"><span class="pre">1</span></code></td>
<td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr>
</tbody>
</table>
<p>Increasing the replication factor to 3 ensures that the internal Kafka Streams topic can tolerate up to 2 broker failures. Changing the acks setting to &#8220;all&#8221;
guarantees that a record will not be lost as long as one replica is alive. The tradeoff from moving to the default values to the recommended ones is
that some performance and more storage space (3x with the replication factor of 3) are sacrificed for more resiliency.</p>
<div class="section" id="acks">
<h4><a class="toc-backref" href="#id22">acks</a><a class="headerlink" href="#acks" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The number of acknowledgments that the leader must have received before considering a request complete. This controls
the durability of records that are sent. The possible values are:</p>
<ul class="simple">
<li><code class="docutils literal"><span class="pre">acks=0</span></code> The producer does not wait for acknowledgment from the server and the record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the <code class="docutils literal"><span class="pre">retries</span></code> configuration will not take effect (as the client won&#8217;t generally know of any failures). The offset returned for each record will always be set to <code class="docutils literal"><span class="pre">-1</span></code>.</li>
<li><code class="docutils literal"><span class="pre">acks=1</span></code> The leader writes the record to its local log and responds without waiting for full acknowledgement from all followers. If the leader immediately fails after acknowledging the record, but before the followers have replicated it, then the record will be lost.</li>
<li><code class="docutils literal"><span class="pre">acks=all</span></code> The leader waits for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost if there is at least one in-sync replica alive. This is the strongest available guarantee.</li>
</ul>
<p>For more information, see the <a class="reference external" href="https://kafka.apache.org/documentation/#producerconfigs">Kafka Producer documentation</a>.</p>
</div></blockquote>
</div>
<div class="section" id="id2">
<h4><a class="toc-backref" href="#id23">replication.factor</a><a class="headerlink" href="#id2" title="Permalink to this headline"></a></h4>
<blockquote>
<div>See the <a class="reference internal" href="#replication-factor-parm"><span class="std std-ref">description here</span></a>.</div></blockquote>
<pre class="line-numbers"><code class="language-java">Properties streamsSettings = new Properties();
streamsSettings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsSettings.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2);
streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");</code></pre>
</div>
</div>
</div>
</div>

View File

@ -620,6 +620,11 @@ public class StreamsConfig extends AbstractConfig {
Type.LIST,
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(NUM_STANDBY_REPLICAS_CONFIG,
Type.INT,
0,
Importance.HIGH,
NUM_STANDBY_REPLICAS_DOC)
.define(STATE_DIR_CONFIG,
Type.STRING,
System.getProperty("java.io.tmpdir") + File.separator + "kafka-streams",
@ -701,11 +706,6 @@ public class StreamsConfig extends AbstractConfig {
atLeast(1),
Importance.MEDIUM,
MAX_WARMUP_REPLICAS_DOC)
.define(NUM_STANDBY_REPLICAS_CONFIG,
Type.INT,
0,
Importance.MEDIUM,
NUM_STANDBY_REPLICAS_DOC)
.define(NUM_STREAM_THREADS_CONFIG,
Type.INT,
1,