kafka/docs/streams/developer-guide/config-streams.html

1516 lines
116 KiB
HTML

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<!-- h1>Developer Guide for Kafka Streams</h1 -->
<div class="sub-nav-sticky">
<div class="sticky-top">
<!-- div style="height:35px">
<a href="/{{version}}/documentation/streams/">Introduction</a>
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
</div -->
</div>
</div>
<div class="section" id="configuring-a-streams-application">
<span id="streams-developer-guide-configuration"></span><h1>Configuring a Streams Application<a class="headerlink" href="#configuring-a-streams-application" title="Permalink to this headline"></a></h1>
<p>Kafka and Kafka Streams configuration options must be configured before using Streams. You can configure Kafka Streams by specifying parameters in a <code class="docutils literal"><span class="pre">java.util.Properties</span></code> instance.</p>
<ol class="arabic">
<li><p class="first">Create a <code class="docutils literal"><span class="pre">java.util.Properties</span></code> instance.</p>
</li>
<li><p class="first">Set the <a class="reference internal" href="#streams-developer-guide-required-configs"><span class="std std-ref">parameters</span></a>. For example:</p>
<pre class="line-numbers"><code class="language-java">import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
// Any further settings
settings.put(... , ...);</code></pre>
</li>
</ol>
<div class="section" id="configuration-parameter-reference">
<span id="streams-developer-guide-required-configs"></span><h2>Configuration parameter reference<a class="headerlink" href="#configuration-parameter-reference" title="Permalink to this headline"></a></h2>
<p>This section contains the most common Streams configuration parameters. For a full reference, see the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/StreamsConfig.html">Streams</a> Javadocs.</p>
<div class="contents local topic" id="contents">
<ul class="simple">
<li><a class="reference internal" href="#required-configuration-parameters" id="id3">Required configuration parameters</a>
<ul>
<li><a class="reference internal" href="#application-id" id="id4">application.id</a></li>
<li><a class="reference internal" href="#bootstrap-servers" id="id5">bootstrap.servers</a></li>
</ul>
</li>
<li><a class="reference internal" href="#recommended-configuration-parameters-for-resiliency" id="id21">Recommended configuration parameters for resiliency</a>
<ul>
<li><a class="reference internal" href="#acks-short" id="id22">acks</a></li>
<li><a class="reference internal" href="#replication-factor-short" id="id23">replication.factor</a></li>
<li><a class="reference internal" href="#min-isr-short" id="id44">min.insync.replicas</a></li>
<li><a class="reference internal" href="#num-standby-replicas-short" id="id45">num.standby.replicas</a></li>
</ul>
</li>
<li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a>
<ul>
<li><a class="reference internal" href="#acceptable-recovery-lag" id="id27">acceptable.recovery.lag</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">default.deserialization.exception.handler (deprecated since 4.0)</a></li>
<li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li>
<li><a class="reference internal" href="#production-exception-handler" id="id24">default.production.exception.handler (deprecated since 4.0)</a></li>
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">deserialization.exception.handler</a></li>
<li><a class="reference internal" href="#enable-metrics-push" id="id43">enable.metrics.push</a></li>
<li><a class="reference internal" href="#ensure-explicit-internal-resource-naming" id="id46">ensure.explicit.internal.resource.naming</a></li>
<li><a class="reference internal" href="#group-protocol" id="id47">group.protocol</a></li>
<li><a class="reference internal" href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
<li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li>
<li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li>
<li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li>
<li><a class="reference internal" href="#num-stream-threads" id="id11">num.stream.threads</a></li>
<li><a class="reference internal" href="#probing-rebalance-interval-ms" id="id30">probing.rebalance.interval.ms</a></li>
<li><a class="reference internal" href="#processing-exception-handler" id="id41">processing.exception.handler</a></li>
<li><a class="reference internal" href="#processing-guarantee" id="id25">processing.guarantee</a></li>
<li><a class="reference internal" href="#processor-wrapper-class" id="id42">processor.wrapper.class</a></li>
<li><a class="reference internal" href="#production-exception-handler" id="id24">production.exception.handler</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-non-overlap-cost" id="id37">rack.aware.assignment.non_overlap_cost</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-strategy" id="id35">rack.aware.assignment.strategy</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-tags" id="id34">rack.aware.assignment.tags</a></li>
<li><a class="reference internal" href="#rack-aware-assignment-traffic-cost" id="id36">rack.aware.assignment.traffic_cost</a></li>
<li><a class="reference internal" href="#replication-factor" id="id13">replication.factor</a></li>
<li><a class="reference internal" href="#rocksdb-config-setter" id="id20">rocksdb.config.setter</a></li>
<li><a class="reference internal" href="#state-dir" id="id14">state.dir</a></li>
<li><a class="reference internal" href="#task-assignor-class" id="id39">task.assignor.class</a></li>
<li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
<li><a class="reference internal" href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde</a></li>
</ul>
</li>
<li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a>
<ul>
<li><a class="reference internal" href="#naming" id="id17">Naming</a></li>
<li><a class="reference internal" href="#default-values" id="id18">Default Values</a></li>
<li><a class="reference internal" href="#parameters-controlled-by-kafka-streams" id="id26">Parameters controlled by Kafka Streams</a></li>
<li><a class="reference internal" href="#enable-auto-commit" id="id19">enable.auto.commit</a></li>
</ul>
</li>
</ul>
</div>
<div class="section" id="required-configuration-parameters">
<h3><a class="toc-backref" href="#id3">Required configuration parameters</a><a class="headerlink" href="#required-configuration-parameters" title="Permalink to this headline"></a></h3>
<p>Here are the required Streams configuration parameters.</p>
<table border="1" class="non-scrolling-table docutils">
<thead valign="bottom">
<tr class="row-odd"><th class="head">Parameter Name</th>
<th class="head">Importance</th>
<th class="head" colspan="2">Description</th>
<th class="head">Default Value</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>application.id</td>
<td>Required</td>
<td colspan="2">An identifier for the stream processing application. Must be unique within the Kafka cluster.</td>
<td>None</td>
</tr>
<tr class="row-odd"><td>bootstrap.servers</td>
<td>Required</td>
<td colspan="2">A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.</td>
<td>None</td>
</tr>
</tbody>
</table>
<div class="section" id="application-id">
<h4><a class="toc-backref" href="#id4">application.id</a><a class="headerlink" href="#application-id" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>(Required) The application ID. Each stream processing application must have a unique ID. The same ID must be given to
all instances of the application. It is recommended to use only alphanumeric characters, <code class="docutils literal"><span class="pre">.</span></code> (dot), <code class="docutils literal"><span class="pre">-</span></code> (hyphen), and <code class="docutils literal"><span class="pre">_</span></code> (underscore). Examples: <code class="docutils literal"><span class="pre">&quot;hello_world&quot;</span></code>, <code class="docutils literal"><span class="pre">&quot;hello_world-v1.0.0&quot;</span></code></p>
<p>This ID is used in the following places to isolate resources used by the application from others:</p>
<ul class="simple">
<li>As the default Kafka consumer and producer <code class="docutils literal"><span class="pre">client.id</span></code> prefix</li>
<li>As the Kafka consumer <code class="docutils literal"><span class="pre">group.id</span></code> for coordination</li>
<li>As the name of the subdirectory in the state directory (cf. <code class="docutils literal"><span class="pre">state.dir</span></code>)</li>
<li>As the prefix of internal Kafka topic names</li>
</ul>
<dl class="docutils">
<dt>Tip:</dt>
<dd>When an application is updated, the <code class="docutils literal"><span class="pre">application.id</span></code> should be changed unless you want to reuse the existing data in internal topics and state stores.
For example, you could embed the version information within <code class="docutils literal"><span class="pre">application.id</span></code>, as <code class="docutils literal"><span class="pre">my-app-v1.0.0</span></code> and <code class="docutils literal"><span class="pre">my-app-v1.0.2</span></code>.</dd>
</dl>
</div></blockquote>
</div>
<div class="section" id="bootstrap-servers">
<h4><a class="toc-backref" href="#id5">bootstrap.servers</a><a class="headerlink" href="#bootstrap-servers" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>(Required) The Kafka bootstrap servers. This is the same <a class="reference external" href="/{{version}}/documentation.html#producerconfigs">setting</a> that is used by the underlying producer and consumer clients to connect to the Kafka cluster.
Example: <code class="docutils literal"><span class="pre">&quot;kafka-broker1:9092,kafka-broker2:9092&quot;</span></code>.</p>
</div></blockquote>
</div>
</div>
<div class="section" id="recommended-configuration-parameters-for-resiliency">
<h3><a class="toc-backref" href="#id21">Recommended configuration parameters for resiliency</a><a class="headerlink" href="#recommended-configuration-parameters-for-resiliency" title="Permalink to this headline"></a></h3>
<p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>
<table border="1" class="non-scrolling-table docutils">
<thead valign="bottom">
<tr class="row-even"><th class="head">Parameter Name</th>
<th class="head">Corresponding Client</th>
<th class="head">Default value</th>
<th class="head">Consider setting to</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-odd">
<td>acks</td>
<td>Producer (for version &lt;=2.8)</td>
<td><code class="docutils literal"><span class="pre">acks="1")</span></code></td>
<td><code class="docutils literal"><span class="pre">acks="all"</span></code></td>
</tr>
<tr class="row-even">
<td>replication.factor (for broker version 2.3 or older)</td>
<td>Streams</td>
<td><code class="docutils literal"><span class="pre">-1</span></code></td>
<td><code class="docutils literal"><span class="pre">3</span></code> (broker 2.4+: ensure broker config <code>default.replication.factor=3</code>)</td>
</tr>
<tr class="row-odd">
<td>min.insync.replicas</td>
<td>Broker</td>
<td><code class="docutils literal"><span class="pre">1</span></code></td>
<td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr>
<tr class="row-even">
<td>num.standby.replicas</td>
<td>Streams</td>
<td><code class="docutils literal"><span class="pre">0</span></code></td>
<td><code class="docutils literal"><span class="pre">1</span></code></td>
</tr>
</tbody>
</table>
<p>Increasing the replication factor to 3 ensures that the internal Kafka Streams topic can tolerate up to 2 broker failures.
The tradeoff from moving to the default values to the recommended ones is
that some performance and more storage space (3x with the replication factor of 3) are sacrificed for more resiliency.</p>
<div class="section" id="acks-short">
<h4><a class="toc-backref" href="#id22">acks</a><a class="headerlink" href="#acks" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The number of acknowledgments that the leader must have received before considering a request complete. This controls
the durability of records that are sent. The possible values are:</p>
<ul class="simple">
<li><code class="docutils literal"><span class="pre">acks="0"</span></code> The producer does not wait for acknowledgment from the server and the record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the producer won&#8217;t generally know of any failures. The offset returned for each record will always be set to <code class="docutils literal"><span class="pre">-1</span></code>.</li>
<li><code class="docutils literal"><span class="pre">acks="1"</span></code> The leader writes the record to its local log and responds without waiting for full acknowledgement from all followers. If the leader immediately fails after acknowledging the record, but before the followers have replicated it, then the record will be lost.</li>
<li><code class="docutils literal"><span class="pre">acks="all"</span></code> (default since 3.0 release) The leader waits for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost if there is at least one in-sync replica alive. This is the strongest available guarantee.</li>
</ul>
<p>For more information, see the <a class="reference external" href="https://kafka.apache.org/documentation/#producerconfigs">Kafka Producer documentation</a>.</p>
</div></blockquote>
</div>
<div class="section" id="replication-factor-short">
<h4><a class="toc-backref" href="#id23">replication.factor</a><a class="headerlink" href="#id23" title="Permalink to this headline"></a></h4>
<blockquote>
<div>See the <a class="reference internal" href="#replication-factor-parm"><span class="std std-ref">description here</span></a>.</div></blockquote>
</div>
<div class="section" id="min-isr-short">
<h4><a class="toc-backref" href="#id44">min.insync.replicas</a><a class="headerlink" href="#id44" title="Permalink to this headline"></a></h4>
<p>The minimum number of in-sync replicas available for replication if the producer is configured with <code>acks="all"</code>
(see <a href="/{{version}}/documentation/#topicconfigs_min.insync.replicas">topic configs</a>).
</p>
</div>
<div class="section" id="num-standby-replicas-short">
<h4><a class="toc-backref" href="#id45">num.standby.replicas</a><a class="headerlink" href="#id45" title="Permalink to this headline"></a></h4>
<blockquote>
<div>See the <a class="reference internal" href="#streams-developer-guide-standby-replicas"><span class="std std-ref">description here</span></a>.</div></blockquote>
</div>
<pre class="line-numbers"><code class="language-java">Properties streamsSettings = new Properties();
// for broker version 2.3 or older
//streamsSettings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
// for version 2.8 or older
//streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
streamsSettings.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2);
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</div>
<div class="section" id="optional-configuration-parameters">
<span id="streams-developer-guide-optional-configs"></span><h3><a class="toc-backref" href="#id6">Optional configuration parameters</a><a class="headerlink" href="#optional-configuration-parameters" title="Permalink to this headline"></a></h3>
<p>Here are the optional <a href="/{{version}}/javadoc/org/apache/kafka/streams/StreamsConfig.html">Streams</a> javadocs, sorted by level of importance:</p>
<blockquote>
<div><ul class="simple">
<li>High: These are parameters with a default value which is most likely not a good fit for production use. It's highly recommended to revisit these parameters for production usage.</li>
<li>Medium: The default values of these parameters should work for production for many cases, but it's not uncommon that they are changed, for example to tune performance.</li>
<li>Low: It should rarely be necessary to change the value for these parameters. It's only recommended to change them if there is a very specific issue you want to address.</li>
</ul>
</div></blockquote>
<table border="1" class="non-scrolling-table docutils">
<thead valign="bottom">
<tr class="row-odd"><th class="head">Parameter Name</th>
<th class="head">Importance</th>
<th class="head" colspan="2">Description</th>
<th class="head">Default Value</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>acceptable.recovery.lag</td>
<td>Medium</td>
<td colspan="2">The maximum acceptable lag (number of offsets to catch up) for an instance to be considered caught-up and ready for the active task.</td>
<td><code class="docutils literal"><span class="pre">10000</span></code></td>
</tr>
<tr class="row-odd"><td>application.server</td>
<td>Low</td>
<td colspan="2">A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of
state stores within a single Kafka Streams application. The value of this must be different for each instance
of the application.</td>
<td>the empty string</td>
</tr>
<tr class="row-even"><td>buffered.records.per.partition</td>
<td>Low</td>
<td colspan="2">The maximum number of records to buffer per partition.</td>
<td><code class="docutils literal"><span class="pre">1000</span></code></td>
</tr>
<tr class="row-odd"><td>statestore.cache.max.bytes</td>
<td>Medium</td>
<td colspan="2">Maximum number of memory bytes to be used for record caches across all threads.</td>
<td><code class="docutils literal"><span class="pre">10485760</span></code></td>
</tr>
<tr class="row-even"><td>cache.max.bytes.buffering (Deprecated. Use statestore.cache.max.bytes instead.)</td>
<td>Medium</td>
<td colspan="2">Maximum number of memory bytes to be used for record caches across all threads.</td>
<td><code class="docutils literal"><span class="pre">10485760</span></code></td>
</tr>
<tr class="row-odd"><td>client.id</td>
<td>Medium</td>
<td colspan="2">An ID string to pass to the server when making requests.
(This setting is passed to the consumer/producer clients used internally by Kafka Streams.)</td>
<td>the empty string</td>
</tr>
<tr class="row-even"><td>commit.interval.ms</td>
<td>Low</td>
<td colspan="2">The frequency in milliseconds with which to save the position (offsets in source topics) of tasks.</td>
<td><code class="docutils literal"><span class="pre">30000</span></code> (30 seconds)</td>
</tr>
<tr class="row-odd"><td>default.deserialization.exception.handler (Deprecated. Use deserialization.exception.handler instead.)</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndContinueExceptionHandler</span></code></td>
</tr>
<tr class="row-even"><td>default.key.serde</td>
<td>Medium</td>
<td colspan="2">Default serializer/deserializer class for record keys, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface. Must be
set by the user or all serdes must be passed in explicitly (see also default.value.serde).</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-odd"><td>default.production.exception.handler (Deprecated. Use production.exception.handler instead.)</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
</tr>
<tr class="row-even"><td>default.timestamp.extractor</td>
<td>Medium</td>
<td colspan="2">Timestamp extractor class that implements the <code class="docutils literal"><span class="pre">TimestampExtractor</span></code> interface.
See <a class="reference internal" href="#streams-developer-guide-timestamp-extractor"><span class="std std-ref">Timestamp Extractor</span></a></td>
<td><code class="docutils literal"><span class="pre">FailOnInvalidTimestamp</span></code></td>
</tr>
<tr class="row-odd"><td>default.value.serde</td>
<td>Medium</td>
<td colspan="2">Default serializer/deserializer class for record values, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface. Must be
set by the user or all serdes must be passed in explicitly (see also default.key.serde).</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-even"><td>default.dsl.store</td>
<td>Low</td>
<td colspan="2">
[DEPRECATED] The default state store type used by DSL operators. Deprecated in
favor of <code>dsl.store.suppliers.class</code>
</td>
<td><code class="docutils literal"><span class="pre">"ROCKS_DB"</span></code></td>
</tr>
<tr class="row-odd"><td>deserialization.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndContinueExceptionHandler</span></code></td>
</tr>
<tr class="row-even"><td>dsl.store.suppliers.class</td>
<td>Low</td>
<td colspan="2">
Defines a default state store implementation to be used by any stateful DSL operator
that has not explicitly configured the store implementation type. Must implement
the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.
</td>
<td><code>BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers</code></td>
</tr>
<tr class="row-odd"><td>ensure.explicit.internal.resource.naming</td>
<td>High</td>
<td colspan="2">
Whether to enforce explicit naming for all internal resources of the topology, including internal
topics (e.g., changelog and repartition topics) and their associated state stores.
When enabled, the application will refuse to start if any internal resource has an auto-generated name.
</td>
<td><code class="docutils literal"><span class="pre">false</span></code></td>
</tr>
<tr class="row-even"><td>log.summary.interval.ms</td>
<td>Low</td>
<td colspan="2">The output interval in milliseconds for logging summary information (disabled if negative).</td>
<td><code class="docutils literal"><span class="pre">120000</span></code> (2 minutes)</td>
</tr>
<tr class="row-odd"><td>enable.metrics.push</td>
<td>Low</td>
<td colspan="2">Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client.</td>
<td><code class="docutils literal"><span class="pre">true</span></code></td>
</tr>
<tr class="row-even"><td>max.task.idle.ms</td>
<td>Medium</td>
<td colspan="2">
<p>
This config controls whether joins and merges may produce out-of-order results.
The config value is the maximum amount of time in milliseconds a stream task will stay idle
when it is fully caught up on some (but not all) input partitions
to wait for producers to send additional records and avoid potential
out-of-order record processing across multiple input streams.
The default (zero) does not wait for producers to send more records,
but it does wait to fetch data that is already present on the brokers.
This default means that for records that are already present on the brokers,
Streams will process them in timestamp order.
Set to -1 to disable idling entirely and process any locally available data,
even though doing so may produce out-of-order processing.
</p>
</td>
<td><code class="docutils literal"><span class="pre">0</span></code></td>
</tr>
<tr class="row-odd"><td>max.warmup.replicas</td>
<td>Medium</td>
<td colspan="2">The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once.</td>
<td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr>
<tr class="row-even"><td>metric.reporters</td>
<td>Low</td>
<td colspan="2">A list of classes to use as metrics reporters.</td>
<td>the empty list</td>
</tr>
<tr class="row-odd"><td>metrics.num.samples</td>
<td>Low</td>
<td colspan="2">The number of samples maintained to compute metrics.</td>
<td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr>
<tr class="row-even"><td>metrics.recording.level</td>
<td>Low</td>
<td colspan="2">The highest recording level for metrics.</td>
<td><code class="docutils literal"><span class="pre">INFO</span></code></td>
</tr>
<tr class="row-odd"><td>metrics.sample.window.ms</td>
<td>Low</td>
<td colspan="2">The window of time in milliseconds a metrics sample is computed over.</td>
<td><code class="docutils literal"><span class="pre">30000</span></code> (30 seconds)</td>
</tr>
<tr class="row-even"><td>num.standby.replicas</td>
<td>High</td>
<td colspan="2">The number of standby replicas for each task.</td>
<td><code class="docutils literal"><span class="pre">0</span></code></td>
</tr>
<tr class="row-odd"><td>num.stream.threads</td>
<td>Medium</td>
<td colspan="2">The number of threads to execute stream processing.</td>
<td><code class="docutils literal"><span class="pre">1</span></code></td>
</tr>
<tr class="row-even"><td>probing.rebalance.interval.ms</td>
<td>Low</td>
<td colspan="2">The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td>
<td><code class="docutils literal"><span class="pre">600000</span></code> (10 minutes)</td>
</tr>
<tr class="row-odd"><td>processing.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProcessingExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndFailProcessingExceptionHandler</span></code></td>
</tr>
<tr class="row-even"><td>processing.guarantee</td>
<td>Medium</td>
<td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code>
or <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+).
See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a>.</td>.
<td><code class="docutils literal"><span class="pre">"at_least_once"</span></code></td>
</tr>
<tr class="row-odd"><td>processor.wrapper.class</td>
<td>Medium</td>
<td colspan="2">A class or class name implementing the <code class="docutils literal"><span class="pre">ProcessorWrapper</span></code> interface.
Must be passed in when creating the topology, and will not be applied unless passed in to the appropriate constructor as a TopologyConfig. You should
use the <code class="docutils literal"><span class="pre">StreamsBuilder#new(TopologyConfig)</span></code> constructor for DSL applications, and the
<code class="docutils literal"><span class="pre">Topology#new(TopologyConfig)</span></code> constructor for PAPI applications.</td>
</tr>
<tr class="row-even"><td>production.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
</tr>
<tr class="row-odd"><td>poll.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to block waiting for input.</td>
<td><code class="docutils literal"><span class="pre">100</span></code></td>
</tr>
<tr class="row-even"><td>rack.aware.assignment.strategy</td>
<td>Low</td>
<td colspan="2">The strategy used for rack aware assignment. Acceptable value are
<code class="docutils literal"><span class="pre">"none"</span></code> (default),
<code class="docutils literal"><span class="pre">"min_traffic"</span></code>, and
<code class="docutils literal"><span class="pre">"balance_suttopology"</span></code>.
See <a class="reference internal" href="#rack-aware-assignment-strategy"><span class="std std-ref">Rack Aware Assignment Strategy</span></a>.</td>
<td><code class="docutils literal"><span class="pre">"none"</span></code></td>
</tr>
<tr class="row-odd"><td>rack.aware.assignment.tags</td>
<td>Low</td>
<td colspan="2">List of tag keys used to distribute standby replicas across Kafka Streams
clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over
clients with different tag values.
See <a class="reference internal" href="#rack-aware-assignment-tags"><span class="std std-ref">Rack Aware Assignment Tags</span></a>.</td>
<td>the empty list</td>
</tr>
<tr class="row-even"><td>rack.aware.assignment.non_overlap_cost</td>
<td>Low</td>
<td colspan="2">Cost associated with moving tasks from existing assignment.
See <a class="reference internal" href="#rack-aware-assignment-non-overlap-cost"><span class="std std-ref">Rack Aware Assignment Non-Overlap-Cost</span></a>.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-odd"><td>rack.aware.assignment.non_overlap_cost</td>
<td>Low</td>
<td colspan="2">Cost associated with cross rack traffic.
See <a class="reference internal" href="#rack-aware-assignment-traffic-cost"><span class="std std-ref">Rack Aware Assignment Traffic-Cost</span></a>.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-even"><td>replication.factor</td>
<td>Medium</td>
<td colspan="2">The replication factor for changelog topics and repartition topics created by the application.
The default of <code>-1</code> (meaning: use broker default replication factor) requires broker version 2.4 or newer.</td>
<td><code class="docutils literal"><span class="pre">-1</span></code></td>
</tr>
<tr class="row-odd"><td>retry.backoff.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds, before a request is retried.</td>
<td><code class="docutils literal"><span class="pre">100</span></code></td>
</tr>
<tr class="row-even"><td>rocksdb.config.setter</td>
<td>Medium</td>
<td colspan="2">The RocksDB configuration.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-odd"><td>state.cleanup.delay.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
<td><code class="docutils literal"><span class="pre">600000</span></code></td> (10 minutes)</td>
</tr>
<tr class="row-even"><td>state.dir</td>
<td>High</td>
<td colspan="2">Directory location for state stores.</td>
<td><code class="docutils literal"><span class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td>
</tr>
<tr class="row-odd"><td>task.assignor.class</td>
<td>Medium</td>
<td colspan="2">A task assignor class or class name implementing the <code>TaskAssignor</code> interface.</td>
<td>The high-availability task assignor.</td>
</tr>
<tr class="row-even"><td>task.timeout.ms</td>
<td>Medium</td>
<td colspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td>
<td><code class="docutils literal"><span class="pre">300000</span></code></td> (5 minutes)</td>
</tr>
<tr class="row-odd"><td>topology.optimization</td>
<td>Medium</td>
<td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: <code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>),
<code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> (<code>single.store.self.join</code>). </td>
<td><code class="docutils literal"><span class="pre">"NO_OPTIMIZATION"</span></code></td>
</tr>
<tr class="row-even"><td>upgrade.from</td>
<td>Medium</td>
<td colspan="2">The version you are upgrading from during a rolling upgrade.
See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td><code class="docutils literal"><span class="pre">86400000</span></code></td> (1 day)</td>
</tr>
<tr class="row-even"><td>window.size.ms</td>
<td>Low</td>
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
</tbody>
</table>
<div class="section" id="acceptable-recovery-lag">
<h4><a class="toc-backref" href="#id27">acceptable.recovery.lag</a><a class="headerlink" href="#acceptable-recovery-lag" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams will only assign
stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assign warmup replicas to restore state in the background for instances
that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.
</p>
<p>
Note: if you set this to <code>Long.MAX_VALUE</code> it effectively disables the warmup replicas and task high availability, allowing Streams to immediately produce a balanced
assignment and migrate tasks to a new instance without first warming them up.
</p>
</div>
</blockquote>
</div>
<div class="section" id="deserialization-exception-handler">
<span id="streams-developer-guide-deh"></span><h4><a class="toc-backref" href="#id7">deserialization.exception.handler (deprecated: default.deserialization.exception.handler)</a><a class="headerlink" href="#deserialization-exception-handler" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The deserialization exception handler allows you to manage record exceptions that fail to deserialize. This
can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception
handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning
<code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue
and continue processing. The following library built-in exception handlers are available:</p>
<ul class="simple">
<li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.html">LogAndContinueExceptionHandler</a>:
This handler logs the deserialization exception and then signals the processing pipeline to continue processing more records.
This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records that fail
to deserialize.</li>
<li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.html">LogAndFailExceptionHandler</a>.
This handler logs the deserialization exception and then signals the processing pipeline to stop processing more records.</li>
</ul>
<p>You can also provide your own customized exception handler besides the library provided ones to meet your needs. For example, you can choose to forward corrupt
records into a quarantine topic (think: a "dead letter queue") for further processing. To do this, use the Producer API to write a corrupted record directly to
the quarantine topic. To be more concrete, you can create a separate <code>KafkaProducer</code> object outside the Streams client, and pass in this object
as well as the dead letter queue topic name into the <code>Properties</code> map, which then can be retrieved from the <code>configure</code> function call.
The drawback of this approach is that "manual" writes are side effects that are invisible to the Kafka Streams runtime library,
so they do not benefit from the end-to-end processing guarantees of the Streams API:</p>
<pre class="line-numbers"><code class="language-java">public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
KafkaProducer&lt;byte[], byte[]&gt; dlqProducer;
String dlqTopic;
@Override
public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
final ConsumerRecord&lt;byte[], byte[]&gt; record,
final Exception exception) {
log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
"taskId: {}, topic: {}, partition: {}, offset: {}",
context.taskId(), record.topic(), record.partition(), record.offset(),
exception);
dlqProducer.send(new ProducerRecord&lt;&gt;(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();
return DeserializationHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map&lt;String, ?&gt; configs) {
dlqProducer = .. // get a producer from the configs map
dlqTopic = .. // get the topic name from the configs map
}
}</code></pre>
</div></blockquote>
</div>
<div class="section" id="production-exception-handler">
<span id="streams-developer-guide-peh"></span><h4><a class="toc-backref" href="#id24">production.exception.handler (deprecated: default.production.exception.handler)</a><a class="headerlink" href="#production-exception-handler" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The production exception handler allows you to manage exceptions triggered when trying to interact with a broker
such as attempting to produce a record that is too large. By default, Kafka provides and uses the <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a>
that always fails when these exceptions occur.</p>
<p>An exception handler can return <code>FAIL</code>, <code>CONTINUE</code>, or <code>RETRY</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down. <code>CONTINUE</code> will signal that Streams
should ignore the issue and continue processing. For <code>RetriableException</code> the handler may return <code>RETRY</code> to tell the runtime to retry sending the failed record (<b>Note:</b> If <code>RETRY</code> is returned for a non-<code>RetriableException</code>
it will be treated as <code>FAIL</code>.) If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p>
<pre class="line-numbers"><code class="language-java">import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
public void configure(Map&lt;String, Object&gt; config) {}
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord&lt;byte[], byte[]&gt; record,
final Exception exception) {
if (exception instanceof RecordTooLargeException) {
return ProductionExceptionHandlerResponse.CONTINUE;
} else {
return ProductionExceptionHandlerResponse.FAIL;
}
}
}
Properties settings = new Properties();
// other various kafka streams settings, e.g. bootstrap servers, application id, etc
settings.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
IgnoreRecordTooLargeHandler.class);</code></pre></div>
</blockquote>
</div>
<div class="section" id="timestamp-extractor">
<span id="streams-developer-guide-timestamp-extractor"></span><h4><a class="toc-backref" href="#id15">default.timestamp.extractor</a><a class="headerlink" href="#timestamp-extractor" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>A timestamp extractor pulls a timestamp from an instance of <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html">ConsumerRecord</a>.
Timestamps are used to control the progress of streams.</p>
<p>The default extractor is
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.html">FailOnInvalidTimestamp</a>.
This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer
client since
<a class="reference external" href="https://cwiki.apache.org/confluence/x/eaSnAw">Kafka version 0.10</a>.
Depending on the setting of Kafka&#8217;s server-side <code class="docutils literal"><span class="pre">log.message.timestamp.type</span></code> broker and <code class="docutils literal"><span class="pre">message.timestamp.type</span></code> topic parameters,
this extractor provides you with:</p>
<ul class="simple">
<li><strong>event-time</strong> processing semantics if <code class="docutils literal"><span class="pre">log.message.timestamp.type</span></code> is set to <code class="docutils literal"><span class="pre">CreateTime</span></code> aka &#8220;producer time&#8221;
(which is the default). This represents the time when a Kafka producer sent the original message. If you use Kafka&#8217;s
official producer client, the timestamp represents milliseconds since the epoch.</li>
<li><strong>ingestion-time</strong> processing semantics if <code class="docutils literal"><span class="pre">log.message.timestamp.type</span></code> is set to <code class="docutils literal"><span class="pre">LogAppendTime</span></code> aka &#8220;broker
time&#8221;. This represents the time when the Kafka broker received the original message, in milliseconds since the epoch.</li>
</ul>
<p>The <code class="docutils literal"><span class="pre">FailOnInvalidTimestamp</span></code> extractor throws an exception if a record contains an invalid (i.e. negative) built-in
timestamp, because Kafka Streams would not process this record but silently drop it. Invalid built-in timestamps can
occur for various reasons: if for example, you consume a topic that is written to by pre-0.10 Kafka producer clients
or by third-party producer clients that don&#8217;t support the new Kafka 0.10 message format yet; another situation where
this may happen is after upgrading your Kafka cluster from <code class="docutils literal"><span class="pre">0.9</span></code> to <code class="docutils literal"><span class="pre">0.10</span></code>, where all the data that was generated
with <code class="docutils literal"><span class="pre">0.9</span></code> does not include the <code class="docutils literal"><span class="pre">0.10</span></code> message timestamps.</p>
<p>If you have data with invalid timestamps and want to process it, then there are two alternative extractors available.
Both work on built-in timestamps, but handle invalid timestamps differently.</p>
<ul class="simple">
<li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.html">LogAndSkipOnInvalidTimestamp</a>:
This extractor logs a warn message and returns the invalid timestamp to Kafka Streams, which will not process but
silently drop the record.
This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records with an
invalid built-in timestamp in your input data.</li>
<li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/UsePartitionTimeOnInvalidTimestamp.html">UsePartitionTimeOnInvalidTimestamp</a>.
This extractor returns the record&#8217;s built-in timestamp if it is valid (i.e. not negative). If the record does not
have a valid built-in timestamps, the extractor returns the previously extracted valid timestamp from a record of the
same topic partition as the current record as a timestamp estimation. In case that no timestamp can be estimated, it
throws an exception.</li>
</ul>
<p>Another built-in extractor is
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/processor/WallclockTimestampExtractor.html">WallclockTimestampExtractor</a>.
This extractor does not actually &#8220;extract&#8221; a timestamp from the consumed record but rather returns the current time in
milliseconds from the system clock (think: <code class="docutils literal"><span class="pre">System.currentTimeMillis()</span></code>), which effectively means Streams will operate
on the basis of the so-called <strong>processing-time</strong> of events.</p>
<p>You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of
messages. If you cannot extract a valid timestamp, you can either throw an exception, return a negative timestamp, or
estimate a timestamp. Returning a negative timestamp will result in data loss &#8211; the corresponding record will not be
processed but silently dropped. If you want to estimate a new timestamp, you can use the value provided via
<code class="docutils literal"><span class="pre">previousTimestamp</span></code> (i.e., a Kafka Streams timestamp estimation). Here is an example of a custom
<code class="docutils literal"><span class="pre">TimestampExtractor</span></code> implementation:</p>
<pre class="line-numbers"><code class="language-java">import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
// Extracts the embedded timestamp of a record (giving you &quot;event-time&quot; semantics).
public class MyEventTimeExtractor implements TimestampExtractor {
@Override
public long extract(final ConsumerRecord&lt;Object, Object&gt; record, final long previousTimestamp) {
// `Foo` is your own custom class, which we assume has a method that returns
// the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC).
long timestamp = -1;
final Foo myPojo = (Foo) record.value();
if (myPojo != null) {
timestamp = myPojo.getTimestampInMillis();
}
if (timestamp &lt; 0) {
// Invalid timestamp! Attempt to estimate a new timestamp,
// otherwise fall back to wall-clock time (processing-time).
if (previousTimestamp &gt;= 0) {
return previousTimestamp;
} else {
return System.currentTimeMillis();
}
}
}
}</code></pre>
<p>You would then define the custom timestamp extractor in your Streams configuration as follows:</p>
<pre class="line-numbers"><code class="language-java">import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);</code></pre>
</div>
</blockquote>
</div>
<div class="section" id="default-key-serde">
<h4><a class="toc-backref" href="#id8">default.key.serde</a><a class="headerlink" href="#default-key-serde" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The default Serializer/Deserializer class for record keys, null unless set by user. Serialization and deserialization in Kafka Streams happens
whenever data needs to be materialized, for example:</p>
<div><ul class="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
</ul>
<p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
</div>
</div></blockquote>
</div>
<div class="section" id="default-value-serde">
<h4><a class="toc-backref" href="#id9">default.value.serde</a><a class="headerlink" href="#default-value-serde" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The default Serializer/Deserializer class for record values, null unless set by user. Serialization and deserialization in Kafka Streams
happens whenever data needs to be materialized, for example:</p>
<ul class="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
</ul>
<p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
</div></blockquote>
</div>
<div class="section" id="ensure-explicit-internal-resource-naming">
<h4><a class="toc-backref" href="#id46">ensure.explicit.internal.resource.naming</a><a class="headerlink" href="#ensure-explicit-internal-resource-naming" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
Whether to enforce explicit naming for all internal resources of the topology, including internal
topics (e.g., changelog and repartition topics) and their associated state stores.
When enabled, the application will refuse to start if any internal resource has an auto-generated name.
</p>
</div>
</blockquote>
</div>
<div class="section" id="group-protocol">
<h4><a class="toc-backref" href="#id47">group.protocol</a><a class="headerlink" href="#group-protocol" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
The group protocol used by the Kafka Streams client used for coordination.
It determines how the client will communicate with the Kafka brokers and other clients in the same group.
The default value is <code class="docutils literal"><span class="pre">"classic"</span></code>, which is the classic consumer group protocol.
Can be set to <code class="docutils literal"><span class="pre">"streams"</span></code> (requires broker-side enablement) to enable the new Kafka Streams group protocol.
Note that the "streams" rebalance protocol is an Early Access feature and should not be used in production.
</p>
</div>
</blockquote>
</div>
<div class="section" id="rack-aware-assignment-non-overlap-cost">
<h4><a class="toc-backref" href="#id37">rack.aware.assignment.non_overlap_cost</a><a class="headerlink" href="#rack-aware-assignment-non-overlap-cost" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
This configuration sets the cost of moving a task from the original assignment computed either by <code class="docutils literal"><span class="pre">StickyTaskAssignor</span></code> or
<code class="docutils literal"><span class="pre">HighAvailabilityTaskAssignor</span></code>. Together with <code class="docutils literal"><span class="pre">rack.aware.assignment.traffic_cost</span></code>,
they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than <code class="docutils literal"><span class="pre">rack.aware.assignment.traffic_cost</span></code>,
the optimizer will try to maintain the existing assignment computed by the task assignor. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting
<code class="docutils literal"><span class="pre">rack.aware.assignment.non_overlap_cost</span></code> to 10 and <code class="docutils literal"><span class="pre">rack.aware.assignment.traffic_cost</span></code> to 1 is more likely to maintain existing assignment than setting
<code class="docutils literal"><span class="pre">rack.aware.assignment.non_overlap_cost</span></code> to 100 and <code class="docutils literal"><span class="pre">rack.aware.assignment.traffic_cost</span></code> to 50.
</p>
<p>
The default value is null which means default <code class="docutils literal"><span class="pre">non_overlap_cost</span></code> in different assignors will be used. In <code class="docutils literal"><span class="pre">StickyTaskAssignor</span></code>, it has a default value of 10 and <code class="docutils literal"><span class="pre">rack.aware.assignment.traffic_cost</span></code> has
a default value of 1, which means maintaining stickiness is preferred in <code class="docutils literal"><span class="pre">StickyTaskAssignor</span></code>. In <code class="docutils literal"><span class="pre">HighAvailabilityTaskAssignor</span></code>, it has a default value of 1 and <code class="docutils literal"><span class="pre">rack.aware.assignment.traffic_cost</span></code> has
a default value of 10, which means minimizing cross rack traffic is preferred in <code class="docutils literal"><span class="pre">HighAvailabilityTaskAssignor</span></code>.
</p>
</div>
</blockquote>
</div>
<div class="section" id="rack-aware-assignment-strategy">
<h4><a class="toc-backref" href="#id35">rack.aware.assignment.strategy</a><a class="headerlink" href="#rack-aware-assignment-strategy" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
This configuration sets the strategy Kafka Streams uses for rack aware task assignment so that cross traffic from broker to client can be reduced. This config will only take effect when <code class="docutils literal"><span class="pre">broker.rack</span></code>
is set on the brokers and <code class="docutils literal"><span class="pre">client.rack</span></code> is set on Kafka Streams side. There are two settings for this config:
</p>
<ul class="simple">
<li><code class="docutils literal"><span class="pre">none</span></code>. This is the default value which means rack aware task assignment will be disabled.</li>
<li><code class="docutils literal"><span class="pre">min_traffic</span></code>. This settings means that the rack aware task assigner will compute an assignment which tries to minimize cross rack traffic.</li>
<li><code class="docutils literal"><span class="pre">balance_subtopology</span></code>. This settings means that the rack aware task assigner will compute an assignment which will try to balance tasks from same subtopology to different clients and minimize cross rack traffic on top of that.</li>
</ul>
<p>
This config can be used together with <a class="reference internal" href="#rack-aware-assignment-non-overlap-cost">rack.aware.assignment.non_overlap_cost</a> and
<a class="reference internal" href="#rack-aware-assignment-traffic-cost">rack.aware.assignment.traffic_cost</a> to balance reducing cross rack traffic and maintaining the existing assignment.
</p>
</div>
</blockquote>
</div>
<div class="section" id="rack-aware-assignment-tags">
<h4><a class="toc-backref" href="#id34">rack.aware.assignment.tags</a><a class="headerlink" href="#rack-aware-assignment-tags" title="Permalink to this headline"></a>
</h4>
<blockquote>
<div>
<p>
This configuration sets a list of tag keys used to distribute standby replicas across Kafka Streams
clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over
clients with different tag values.
</p>
<p>
Tags for the Kafka Streams clients can be set via <code class="docutils literal"><span class="pre">client.tag.</span></code>
prefix. Example:
</p>
<pre><code class="language-text">
Client-1 | Client-2
_______________________________________________________________________
client.tag.zone: eu-central-1a | client.tag.zone: eu-central-1b
client.tag.cluster: k8s-cluster1 | client.tag.cluster: k8s-cluster1
rack.aware.assignment.tags: zone,cluster | rack.aware.assignment.tags: zone,cluster
Client-3 | Client-4
_______________________________________________________________________
client.tag.zone: eu-central-1a | client.tag.zone: eu-central-1b
client.tag.cluster: k8s-cluster2 | client.tag.cluster: k8s-cluster2
rack.aware.assignment.tags: zone,cluster | rack.aware.assignment.tags: zone,cluster</code></pre>
<p>
In the above example, we have four Kafka Streams clients across two zones (<code class="docutils literal"><span class="pre">eu-central-1a</span></code>, <code class="docutils literal"><span class="pre">eu-central-1b</span></code>) and across two clusters (<code class="docutils literal"><span class="pre">k8s-cluster1</span></code>, <code class="docutils literal"><span class="pre">k8s-cluster2</span></code>).
For an active task located on <code class="docutils literal"><span class="pre">Client-1</span></code>, Kafka Streams will allocate a standby task on <code class="docutils literal"><span class="pre">Client-4</span></code>, since <code class="docutils literal"><span class="pre">Client-4</span></code> has a different <code class="docutils literal"><span class="pre">zone</span></code> and a different <code class="docutils literal"><span class="pre">cluster</span></code> than <code class="docutils literal"><span class="pre">Client-1</span></code>.
</p>
</div>
</blockquote>
</div>
<div class="section" id="rack-aware-assignment-traffic-cost">
<h4><a class="toc-backref" href="#id36">rack.aware.assignment.traffic_cost</a><a class="headerlink" href="#rack-aware-assignment-traffic-cost" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
This configuration sets the cost of cross rack traffic. Together with <code class="docutils literal"><span class="pre">rack.aware.assignment.non_overlap_cost</span></code>,
they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than <code class="docutils literal"><span class="pre">rack.aware.assignment.non_overlap_cost</span></code>,
the optimizer will try to compute an assignment which minimize the cross rack traffic. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting
<code class="docutils literal"><span class="pre">rack.aware.assignment.traffic_cost</span></code> to 10 and <code class="docutils literal"><span class="pre">rack.aware.assignment.non_overlap_cost</span></code> to 1 is more likely to minimize cross rack traffic than setting
<code class="docutils literal"><span class="pre">rack.aware.assignment.traffic_cost</span></code> to 100 and <code class="docutils literal"><span class="pre">rack.aware.assignment.non_overlap_cost</span></code> to 50.
</p>
<p>
The default value is null which means default traffic cost in different assignors will be used. In <code class="docutils literal"><span class="pre">StickyTaskAssignor</span></code>, it has a default value of 1 and <code class="docutils literal"><span class="pre">rack.aware.assignment.non_overlap_cost</span></code>
has a default value of 10. In <code class="docutils literal"><span class="pre">HighAvailabilityTaskAssignor</span></code>, it has a default value of 10 and <code class="docutils literal"><span class="pre">rack.aware.assignment.non_overlap_cost</span></code> has a default value of 1.
</p>
</div>
</blockquote>
</div>
<div class="section" id="log-summary-interval-ms">
<h4><a class="toc-backref" href="#id40">log.summary.interval.ms</a><a class="headerlink" href="#log-summary-interval-ms" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
This configuration controls the output interval for summary information.
If greater or equal to 0, the summary log will be output according to the set time interval;
If less than 0, summary output is disabled.
</div>
</blockquote>
</div>
<div class="section" id="enable-metrics-push">
<span id="streams-developer-guide-enable-metrics-push"></span><h4><a class="toc-backref" href="#id43">enable.metrics.push</a><a class="headerlink" href="#enable-metrics-push" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
Kafka Streams metrics can be pushed to the brokers similar to client metrics.
Additionally, Kafka Streams allows to enable/disable metric pushing for each embedded client individually.
However, pushing Kafka Streams metrics requires that <code>enable.metric.push</code> is enabled on the main-consumer and admin client.
</p>
</div>
</blockquote>
</div>
<div class="section" id="max-task-idle-ms">
<span id="streams-developer-guide-max-task-idle-ms"></span><h4><a class="toc-backref" href="#id28">max.task.idle.ms</a><a class="headerlink" href="#max-task-idle-ms" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
This configuration controls how long Streams will wait to fetch data in order to
provide in-order processing semantics.
</p>
<p>
When processing a task that has multiple input partitions (as in a join or merge),
Streams needs to choose which partition to process the next record from.
When all input partitions have locally buffered data, Streams picks the partition
whose next record has the lowest timestamp. This has the desirable effect of
collating the input partitions in timestamp order, which is generally what you
want in a streaming join or merge. However, when Streams does not have any data
buffered locally for one of the partitions, it does not know whether the next
record for that partition will have a lower or higher timestamp than the remaining
partitions' records.
</p>
<p>
There are two cases to consider: either there is data in that partition on the
broker that Streams has not fetched yet, or Streams is fully caught up with that
partition on the broker, and the producers simply haven't produced any new records
since Streams polled the last batch.
</p>
<p>
The default value of
<code class="docutils literal"><span class="pre">0</span></code>
causes Streams to delay processing a task when it detects that it has no locally
buffered data for a partition, but there is data available on the brokers.
Specifically, when there is an empty partition in the local buffer, but Streams
has a non-zero lag for that partition. However, as soon as Streams catches up to
the broker, it will continue processing, even if there is no data in one of the
partitions. That is, it will not wait for new data to be <em>produced</em>.
This default is designed to sacrifice some throughput in exchange for intuitively
correct join semantics.
</p>
<p>
Any config value greater than zero indicates the number of <em>extra</em>
milliseconds that Streams will wait if it has a caught-up but empty partition.
In other words, this is the amount of time to wait for new data to be produced
to the input partitions to ensure in-order processing of data in the event
of a slow producer.
</p>
<p>
The config value of
<code class="docutils literal"><span class="pre">-1</span></code>
indicates that Streams will never wait to buffer empty partitions before choosing
the next record by timestamp, which achieves maximum throughput at the expense of
introducing out-of-order processing.
</p>
</div>
</blockquote>
</div>
<div class="section" id="max-warmup-replicas">
<span id="streams-developer-guide-max-warmup-replicas"></span><h4><a class="toc-backref" href="#id29">max.warmup.replicas</a><a class="headerlink" href="#max-warmup-replicas" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
The maximum number of warmup replicas (extra standbys beyond the configured <code class="docutils literal"><span class="pre">num.standbys</span></code>) that can be assigned at once for the purpose of keeping
the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker
traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time
for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1.
</p>
<p>
Note that one warmup replica corresponds to one <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">Stream Task</a>. Furthermore, note that each warmup task can only be promoted to an active task during
a rebalance (normally during a so-called probing rebalance, which occur at a frequency specified by the
<code class="docutils literal"><span class="pre">probing.rebalance.interval.ms</span></code> config). This means that the
maximum rate at which active tasks can be migrated from one Kafka Streams instance to another instance can be determined by
(<code class="docutils literal"><span class="pre">max.warmup.replicas</span></code> /
<code class="docutils literal"><span class="pre">probing.rebalance.interval.ms</span></code>).
</p>
</div>
</blockquote>
</div>
<div class="section" id="num-standby-replicas">
<span id="streams-developer-guide-standby-replicas"></span><h4><a class="toc-backref" href="#id10">num.standby.replicas</a><a class="headerlink" href="#num-standby-replicas" title="Permalink to this headline"></a></h4>
<blockquote>
<div>The number of standby replicas. Standby replicas are shadow copies of local state stores. Kafka Streams attempts to create the
specified number of replicas per store and keep them up to date as long as there are enough instances running.
Standby replicas are used to minimize the latency of task failover. A task that was previously running on a failed instance is
preferred to restart on an instance that has standby replicas so that the local state store restoration process from its
changelog can be minimized. Details about how Kafka Streams makes use of the standby replicas to minimize the cost of
resuming tasks on failover can be found in the <a class="reference internal" href="../architecture.html#streams_architecture_state"><span class="std std-ref">State</span></a> section.
<dl class="docutils">
<dt>Recommendation:</dt>
<dd>Increase the number of standbys to 1 to get instant fail-over, i.e., high-availability.
Increasing the number of standbys requires more client-side storage space.
For example, with 1 standby, 2x space is required.</dd>
</dl>
<dl class="docutils">
<dt>Note:</dt>
<dd>If you enable <cite>n</cite> standby tasks, you need to provision <cite>n+1</cite> <code class="docutils literal"><span class="pre">KafkaStreams</span></code> instances.</dd>
</dl>
</div>
</blockquote>
</div>
<div class="section" id="num-stream-threads">
<h4><a class="toc-backref" href="#id11">num.stream.threads</a><a class="headerlink" href="#num-stream-threads" title="Permalink to this headline"></a></h4>
<blockquote>
<div>This specifies the number of stream threads in an instance of the Kafka Streams application. The stream processing code runs in these thread.
For more information about Kafka Streams threading model, see <a class="reference internal" href="../architecture.html#streams_architecture_threads"><span class="std std-ref">Threading Model</span></a>.</div></blockquote>
</div>
<div class="section" id="probing-rebalance-interval-ms">
<h4><a class="toc-backref" href="#id30">probing.rebalance.interval.ms</a><a class="headerlink" href="#probing-rebalance-interval-ms" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
The maximum time to wait before triggering a rebalance to probe for warmup replicas that have restored enough to be considered caught up. Streams will only assign stateful active tasks to
instances that are caught up and within the <a class="reference internal" href="#acceptable-recovery-lag"><span class="std std-ref">acceptable.recovery.lag</span></a>, if any exist. Probing rebalances are used to query the latest total lag of warmup replicas and transition
them to active tasks if ready. They will continue to be triggered as long as there are warmup tasks, and until the assignment is balanced. Must be at least 1 minute.
</div></blockquote>
</div>
<div class="section" id="processing-exception-handler">
<span id="streams-developer-guide-proceh"></span><h4><a class="toc-backref" href="#id41">processing.exception.handler</a><a class="headerlink" href="#processing-exception-handler" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The processing exception handler allows you to manage exceptions triggered during the processing of a record. The implemented exception
handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning
<code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue
and continue processing. The following library built-in exception handlers are available:</p>
<ul class="simple">
<li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.html">LogAndContinueProcessingExceptionHandler</a>:
This handler logs the processing exception and then signals the processing pipeline to continue processing more records.
This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records that fail to be processed.</li>
<li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.html">LogAndFailProcessingExceptionHandler</a>.
This handler logs the processing exception and then signals the processing pipeline to stop processing more records.</li>
</ul>
<p>You can also provide your own customized exception handler besides the library provided ones to meet your needs. For example, you can choose to forward corrupt
records into a quarantine topic (think: a "dead letter queue") for further processing. To do this, use the Producer API to write a corrupted record directly to
the quarantine topic. To be more concrete, you can create a separate <code>KafkaProducer</code> object outside the Streams client, and pass in this object
as well as the dead letter queue topic name into the <code>Properties</code> map, which then can be retrieved from the <code>configure</code> function call.
The drawback of this approach is that "manual" writes are side effects that are invisible to the Kafka Streams runtime library,
so they do not benefit from the end-to-end processing guarantees of the Streams API:</p>
<pre class="line-numbers"><code class="language-java">public class SendToDeadLetterQueueExceptionHandler implements ProcessingExceptionHandler {
KafkaProducer&lt;byte[], byte[]&gt; dlqProducer;
String dlqTopic;
@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
final Record<?, ?> record,
final Exception exception) {
log.warn("Exception caught during message processing, sending to the dead queue topic; " +
"processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(),
exception);
dlqProducer.send(new ProducerRecord<>(dlqTopic, null, record.timestamp(), (byte[]) record.key(), (byte[]) record.value(), record.headers()));
return ProcessingHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map&lt;String, ?&gt; configs) {
dlqProducer = .. // get a producer from the configs map
dlqTopic = .. // get the topic name from the configs map
}
}</code></pre>
</div></blockquote>
</div>
<div class="section" id="processing-guarantee">
<span id="streams-developer-guide-processing-guarantee"></span><h4><a class="toc-backref" href="#id25">processing.guarantee</a><a class="headerlink" href="#processing-guarantee" title="Permalink to this headline"></a></h4>
<blockquote>
<div>The processing guarantee that should be used.
Possible values are <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default)
and <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2).
Deprecated config options are <code class="docutils literal"><span class="pre">"exactly_once"</span></code> (for EOS alpha),
and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> (for EOS version 2).
Using <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (or the deprecated
<code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code>) requires broker version 2.5 or newer,
while using the deprecated <code class="docutils literal"><span class="pre">"exactly_once"</span></code>
requires broker version 0.11.0 or newer.
Note that if exactly-once processing is enabled, the default for parameter
<code class="docutils literal"><span class="pre">commit.interval.ms</span></code> changes to 100ms.
Additionally, consumers are configured with <code class="docutils literal"><span class="pre">isolation.level="read_committed"</span></code>
and producers are configured with <code class="docutils literal"><span class="pre">enable.idempotence=true</span></code> per default.
Note that by default exactly-once processing requires a cluster of at least three brokers what is the recommended setting for production.
For development, you can change this configuration by adjusting broker setting
<code class="docutils literal"><span class="pre">transaction.state.log.replication.factor</span></code>
and <code class="docutils literal"><span class="pre">transaction.state.log.min.isr</span></code>
to the number of brokers you want to use.
For more details see <a href="../core-concepts#streams_processing_guarantee">Processing Guarantees</a>.
<dl class="docutils">
<dt>Recommendation:</dt>
<dd>While it is technically possible to use EOS with any replication factor, using a replication factor lower
than 3 effectively voids EOS. Thus it is strongly recommended to use a replication factor of 3 (together with
<code>min.in.sync.replicas=2</code>). This recommendation applies to all topics (i.e. <code>__transaction_state</code>,
<code>__consumer_offsets</code>, Kafka Streams internal topics, and user topics).</dd>
</dl>
</div></blockquote>
</div>
<div class="section" id="processor-wrapper-class">
<span id="streams-developer-guide-processor-wrapper-class"></span><h4><a class="toc-backref" href="#id42">processor.wrapper.class</a><a class="headerlink" href="#processor-wrapper-class" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
A class or class name implementing the <code class="docutils literal"><span class="pre">ProcessorWrapper</span></code> interface. This feature allows you to wrap any of the
processors in the compiled topology, including both custom processor implementations and those created by Streams for DSL operators. This can be useful for logging or tracing
implementations since it allows access to the otherwise-hidden processor context for DSL operators, and also allows for injecting additional debugging information to an entire
application topology with just a single config.
</p>
<p>
IMPORTANT: This MUST be passed in when creating the topology, and will not be applied unless passed in to the appropriate topology-building constructor. You should
use the <code class="docutils literal"><span class="pre">StreamsBuilder#new(TopologyConfig)</span></code> constructor for DSL applications, and the
<code class="docutils literal"><span class="pre">Topology#new(TopologyConfig)</span></code> constructor for PAPI applications.
</p>
</div></blockquote>
</div>
<div class="section" id="replication-factor">
<span id="replication-factor-parm"></span><h4><a class="toc-backref" href="#id13">replication.factor</a><a class="headerlink" href="#replication-factor" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>This specifies the replication factor of internal topics that Kafka Streams creates when local states are used or a stream is
repartitioned for aggregation. Replication is important for fault tolerance. Without replication even a single broker failure
may prevent progress of the stream processing application. It is recommended to use a similar replication factor as source topics.</p>
<dl class="docutils">
<dt>Recommendation:</dt>
<dd>Increase the replication factor to 3 to ensure that the internal Kafka Streams topic can tolerate up to 2 broker failures.
Note that you will require more storage space as well (3x with the replication factor of 3).</dd>
</dl>
</div></blockquote>
</div>
<div class="section" id="rocksdb-config-setter">
<span id="streams-developer-guide-rocksdb-config"></span><h4><a class="toc-backref" href="#id20">rocksdb.config.setter</a><a class="headerlink" href="#rocksdb-config-setter" title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default
configuration for RocksDB, you can implement <code class="docutils literal"><span class="pre">RocksDBConfigSetter</span></code> and provide your custom class via <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p>
<p>Here is an example that adjusts the memory size consumed by RocksDB.</p>
<pre class="line-numbers"><code class="language-java">public static class CustomRocksDBConfig implements RocksDBConfigSetter {
// This object should be a member variable so it can be closed in RocksDBConfigSetter#close.
private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);
@Override
public void setConfig(final String storeName, final Options options, final Map&lt;String, Object&gt; configs) {
// See #1 below.
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
tableConfig.setBlockCache(cache);
// See #2 below.
tableConfig.setBlockSize(16 * 1024L);
// See #3 below.
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
// See #4 below.
options.setMaxWriteBufferNumber(2);
}
@Override
public void close(final String storeName, final Options options) {
// See #5 below.
cache.close();
}
}
Properties streamsSettings = new Properties();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);</code></pre>
<dl class="docutils">
<dt>Notes for example:</dt>
<dd><ol class="first last arabic simple">
<li><code class="docutils literal"><span class="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <code class="docutils literal"><span class="pre">BloomFilter</span></code>, which is an important optimization.
<li><code class="docutils literal"><span class="pre">tableConfig.setBlockSize(16</span> <span class="pre">*</span> <span class="pre">1024L);</span></code> Modify the default <a class="reference external" href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><code class="docutils literal"><span class="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><code class="docutils literal"><span class="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
<li><code class="docutils literal"><span class="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li>
</ol>
</dd>
</dl>
</div>
</blockquote>
</div>
</div>
</blockquote>
</div>
<div class="section" id="state-dir">
<h4><a class="toc-backref" href="#id14">state.dir</a><a class="headerlink" href="#state-dir" title="Permalink to this headline"></a></h4>
<blockquote>
<div>The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting
machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated
with the application are created under this subdirectory. When running multiple instances of the same application on a single machine,
this path must be unique for each such instance.</div>
</blockquote>
</div>
<div class="section" id="task-assignor-class">
<h4><a class="toc-backref" href="#id39">task.assignor.class</a><a class="headerlink" href="#task-assignor-class" title="Permalink to this headline"></a></h4>
<blockquote>
<div>A task assignor class or class name implementing the
<code>org.apache.kafka.streams.processor.assignment.TaskAssignor</code> interface. Defaults to the
high-availability task assignor. One possible alternative implementation provided in Apache Kafka is the
<code>org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor</code>, which was the default task
assignor before KIP-441 and minimizes task movement at the cost of stateful task availability. Alternative implementations of
the task assignment algorithm can be plugged into the application by implementing a custom <code>TaskAssignor</code> and setting this config to the name of the custom task assignor class.
</div>
</blockquote>
</div>
<div class="section" id="topology-optimization">
<h4><a class="toc-backref" href="#id31">topology.optimization</a><a class="headerlink" href="#topology-optimization" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: <code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>), <code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> (<code>single.store.self.join</code>).
</p>
<p>
We recommend listing specific optimizations in the config for production code so that the structure of your topology will not change unexpectedly during upgrades of the Streams library.
</p>
<p>
These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. These optimizations will save on network traffic and storage in Kafka without changing the semantics of your applications. Enabling them is recommended.
</p>
<p>
Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
configuration properties when building your topology by using the overloaded <code>StreamsBuilder.build(Properties)</code> method.
For example <code>KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)</code>.
</p>
</div></blockquote>
</div>
<div class="section" id="windowed.inner.class.serde">
<h4><a class="toc-backref" href="#id31">windowed.inner.class.serde</a><a class="headerlink" href="#windowed.inner.class.serde" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface.
</p>
<p>
Note that this config is only used by plain consumer/producer clients that set a windowed de/serializer type via configs. For Kafka Streams applications that deal with windowed types, you must pass in the inner serde type when you instantiate the windowed serde object for your topology.
</p>
</div></blockquote>
</div>
<div class="section" id="upgrade-from">
<span id="streams-developer-guide-upgrade-from"></span><h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide.
You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the
newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path
when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4.
</div>
</blockquote>
</div>
</div>
<div class="section" id="kafka-consumers-and-producer-configuration-parameters">
<h3><a class="toc-backref" href="#id16">Kafka consumers, producer and admin client configuration parameters</a><a class="headerlink" href="#kafka-consumers-and-producer-configuration-parameters" title="Permalink to this headline"></a></h3>
<p>You can specify parameters for the Kafka <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/package-summary.html">consumers</a>, <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/producer/package-summary.html">producers</a>,
and <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html">admin client</a> that are used internally.
The consumer, producer and admin client settings are defined by specifying parameters in a <code class="docutils literal"><span class="pre">StreamsConfig</span></code> instance.</p>
<p>In this example, the Kafka <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer session timeout</a> is configured to be 60000 milliseconds in the Streams settings:</p>
<pre class="line-numbers"><code class="language-java">Properties streamsSettings = new Properties();
// Example of a &quot;normal&quot; setting for Kafka Streams
streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;kafka-broker-01:9092&quot;);
// Customize the Kafka consumer settings of your Streams application
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);</code></pre>
</div>
<div class="section" id="naming">
<h4><a class="toc-backref" href="#id17">Naming</a><a class="headerlink" href="#naming" title="Permalink to this headline"></a></h4>
<p>Some consumer, producer and admin client configuration parameters use the same parameter name, and Kafka Streams library itself also uses some parameters that share the same name with its embedded client. For example, <code class="docutils literal"><span class="pre">send.buffer.bytes</span></code> and
<code class="docutils literal"><span class="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <code class="docutils literal"><span class="pre">request.timeout.ms</span></code> and <code class="docutils literal"><span class="pre">retry.backoff.ms</span></code> control retries for client request.
You can avoid duplicate names by prefix parameter names with <code class="docutils literal"><span class="pre">consumer.</span></code>, <code class="docutils literal"><span class="pre">producer.</span></code>, or <code class="docutils literal"><span class="pre">admin.</span></code> (e.g., <code class="docutils literal"><span class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
<pre class="line-numbers"><code class="language-java">Properties streamsSettings = new Properties();
// same value for consumer, producer, and admin client
streamsSettings.put(&quot;PARAMETER_NAME&quot;, &quot;value&quot;);
// different values for consumer and producer
streamsSettings.put(&quot;consumer.PARAMETER_NAME&quot;, &quot;consumer-value&quot;);
streamsSettings.put(&quot;producer.PARAMETER_NAME&quot;, &quot;producer-value&quot;);
streamsSettings.put(&quot;admin.PARAMETER_NAME&quot;, &quot;admin-value&quot;);
// alternatively, you can use
streamsSettings.put(StreamsConfig.consumerPrefix(&quot;PARAMETER_NAME&quot;), &quot;consumer-value&quot;);
streamsSettings.put(StreamsConfig.producerPrefix(&quot;PARAMETER_NAME&quot;), &quot;producer-value&quot;);
streamsSettings.put(StreamsConfig.adminClientPrefix(&quot;PARAMETER_NAME&quot;), &quot;admin-value&quot;);</code></pre>
<p>You could further separate consumer configuration by adding different prefixes:</p>
<ul class="simple">
<li><code class="docutils literal"><span class="pre">main.consumer.</span></code> for main consumer which is the default consumer of stream source.</li>
<li><code class="docutils literal"><span class="pre">restore.consumer.</span></code> for restore consumer which is in charge of state store recovery.</li>
<li><code class="docutils literal"><span class="pre">global.consumer.</span></code> for global consumer which is used in global KTable construction.</li>
</ul>
<p>For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use <code class="docutils literal"><span class="pre">restore.consumer.</span></code> to set the config.</p>
<pre class="line-numbers"><code class="language-java">Properties streamsSettings = new Properties();
// same config value for all consumer types
streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
// set a different restore consumer config. This would make restore consumer take restore-consumer-value,
// while main consumer and global consumer stay with general-consumer-value
streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");</code></pre>
<p> Same applied to <code class="docutils literal"><span class="pre">main.consumer.</span></code> and <code class="docutils literal"><span class="pre">main.consumer.</span></code>, if you only want to specify one consumer type config.</p>
<p> Additionally, to configure the internal repartition/changelog topics, you could use the <code class="docutils literal"><span class="pre">topic.</span></code> prefix, followed by any of the standard topic configs.</p>
<pre class="line-numbers"><code class="language-java">Properties streamsSettings = new Properties();
// Override default for both changelog and repartition topics
streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");</code></pre>
</div>
</div>
<div class="section" id="default-values">
<h4><a class="toc-backref" href="#id18">Default Values</a><a class="headerlink" href="#default-values" title="Permalink to this headline"></a></h4>
<p>Kafka Streams uses different default values for some of the underlying client configs, which are summarized below. For detailed descriptions
of these configs, see <a class="reference external" href="/{{version}}/documentation.html#producerconfigs">Producer Configs</a>
and <a class="reference external" href="/{{version}}/documentation.html#consumerconfigs">Consumer Configs</a>.</p>
<table border="1" class="non-scrolling-table docutils">
<thead valign="bottom">
<tr class="row-odd"><th class="head">Parameter Name</th>
<th class="head">Corresponding Client</th>
<th class="head">Streams Default</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>auto.offset.reset</td>
<td>Consumer</td>
<td><code class="docutils literal"><span class="pre">earliest</span></code></td>
</tr>
<tr class="row-odd"><td>linger.ms</td>
<td>Producer</td>
<td><code class="docutils literal"><span class="pre">100</span></code></td>
</tr>
<tr class="row-even"><td>max.poll.records</td>
<td>Consumer</td>
<td><code class="docutils literal"><span class="pre">1000</span></code></td>
</tr>
<tr class="row-odd">
<td>client.id</td>
<td>-</td>
<td><code class="docutils literal"><span class="pre">&lt;application.id&gt;-&lt;random-UUID&gt;</span></code></td>
</tr>
</tbody>
</table>
<p>If EOS is enabled, other parameters have the following default values.</p>
<table border="1" class="non-scrolling-table docutils">
<thead valign="bottom">
<tr class="row-odd"><th class="head">Parameter Name</th>
<th class="head">Corresponding Client</th>
<th class="head">Streams Default</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even">
<td>transaction.timeout.ms</td>
<td>Producer</td>
<td><code class="docutils literal"><span class="pre">10000</span></code></td>
</tr>
<tr class="row-odd">
<td>delivery.timeout.ms</td>
<td>Producer</td>
<td><code class="docutils literal"><span class="pre">Integer.MAX_VALUE</span></code></td>
</tr>
</tbody>
</table>
</div>
<div class="section" id="parameters-controlled-by-kafka-streams">
<h3><a class="toc-backref" href="#id26">Parameters controlled by Kafka Streams</a><a class="headerlink" href="#parameters-controlled-by-kafka-streams" title="Permalink to this headline"></a></h3>
<p>Some parameters are not configurable by the user. If you supply a value that is different from the default value,
your value is ignored. Below is a list of some of these parameters.</p>
<table border="1" class="non-scrolling-table docutils">
<thead valign="bottom">
<tr class="row-odd">
<th class="head">Parameter Name</th>
<th class="head">Corresponding Client</th>
<th class="head">Streams Default</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-odd"><td>allow.auto.create.topics</td>
<td>Consumer</td>
<td><code class="docutils literal"><span class="pre">false</span></code></td>
</tr>
<tr class="row-even"><td>group.id</td>
<td>Consumer</td>
<td><code class="docutils literal"><span class="pre">application.id</span></code></td>
</tr>
<tr class="row-odd"><td>enable.auto.commit</td>
<td>Consumer</td>
<td><code class="docutils literal"><span class="pre">false</span></code></td>
</tr>
<tr class="row-even"><td>partition.assignment.strategy</td>
<td>Consumer</td>
<td><code class="docutils literal"><span class="pre">StreamsPartitionAssignor</span></code></td>
</tr>
</tbody>
</table>
<p>If EOS is enabled, other parameters are set with the following values.</p>
<table border="1" class="non-scrolling-table docutils">
<thead valign="bottom">
<tr class="row-odd">
<th class="head">Parameter Name</th>
<th class="head">Corresponding Client</th>
<th class="head">Streams Default</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even">
<td>isolation.level</td>
<td>Consumer</td>
<td><code class="docutils literal"><span class="pre">READ_COMMITTED</span></code></td>
</tr>
<tr class="row-odd">
<td>enable.idempotence</td>
<td>Producer</td>
<td><code class="docutils literal"><span class="pre">true</span></code></td>
</tr>
</tbody>
</table>
<div class="section" id="client-id">
<h3><a class="toc-backref" href="#id38">client.id</a><a class="headerlink" href="#client-id" title="Permalink to this headline"></a></h3>
<p>Kafka Streams uses the <code class="docutils literal"><span class="pre">client.id</span></code>
parameter to compute derived client IDs for internal clients. If you don't set
<code class="docutils literal"><span class="pre">client.id</span></code>, Kafka Streams sets it to
<code class="docutils literal"><span class="pre">&lt;application.id&gt;-&lt;random-UUID&gt;</span></code>.</p>
<p>This value will be used to derive the client IDs of the following internal clients.</p>
<table border="1" class="non-scrolling-table docutils">
<colgroup>
<col width="30%">
<col width="70%">
</colgroup>
<thead valign="bottom">
<tr class="row-odd">
<th class="head">Client</th>
<th class="head">client.id</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even">
<td>Consumer</td>
<td><code class="docutils literal"><span class="pre">&lt;client.id&gt;-StreamThread-&lt;threadIdx&gt;-consumer</span></code></td>
</tr>
<tr class="row-odd">
<td>Restore consumer</td>
<td><code class="docutils literal"><span class="pre">&lt;client.id&gt;-StreamThread-&lt;threadIdx&gt;-restore-consumer</span></code></td>
</tr>
<tr class="row-even">
<td>Global consumer</td>
<td><code class="docutils literal"><span class="pre">&lt;client.id&gt;-global-consumer</span></code></td>
</tr>
</tbody>
<tr class="row-odd">
<td rowspan="2">Producer</td>
<td>
<strong>For Non-EOS and EOS v2: </strong><code class="docutils literal"><span class="pre">&lt;client.id&gt;-StreamThread-&lt;threadIdx&gt;-producer</span></code>
</td>
</tr>
<tr>
<td>
<strong>For EOS v1: </strong><code class="docutils literal"><span class="pre">&lt;client.id&gt;-StreamThread-&lt;threadIdx&gt;-&lt;taskId&gt;-producer</span></code>
</td>
</tr>
<tr class="row-even">
<td>Admin</td>
<td><code class="docutils literal"><span class="pre">&lt;client.id&gt;-admin</span></code></td>
</tr>
</table>
</div>
<div class="section" id="enable-auto-commit">
<span id="streams-developer-guide-consumer-auto-commit"></span><h4><a class="toc-backref" href="#id19">enable.auto.commit</a><a class="headerlink" href="#enable-auto-commit" title="Permalink to this headline"></a></h4>
<blockquote>
<div>The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config
value to <code class="docutils literal"><span class="pre">false</span></code>. Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides
to commit the current processing state.</div></blockquote>
</div>
</div>
</div>
</div>
</div>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/write-streams" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide/dsl-api" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<!--#include virtual="../../../includes/_header.htm" -->
<!--#include virtual="../../../includes/_top.htm" -->
<div class="content documentation">
<!--#include virtual="../../../includes/_nav.htm" -->
<div class="right">
<!--//#include virtual="../../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Kafka Streams</a></li>
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
//sticky secondary nav
var $navbar = $(".sub-nav-sticky"),
y_pos = $navbar.offset().top,
height = $navbar.height();
$(window).scroll(function() {
var scrollTop = $(window).scrollTop();
if (scrollTop > y_pos - height) {
$navbar.addClass("navbar-fixed")
} else if (scrollTop <= y_pos) {
$navbar.removeClass("navbar-fixed")
}
});
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>