<spanid="streams-developer-guide-configuration"></span><h1>Configuring a Streams Application<aclass="headerlink"href="#configuring-a-streams-application"title="Permalink to this headline"></a></h1>
<p>Kafka and Kafka Streams configuration options must be configured before using Streams. You can configure Kafka Streams by specifying parameters in a <codeclass="docutils literal"><spanclass="pre">java.util.Properties</span></code> instance.</p>
<li><pclass="first">Set the <aclass="reference internal"href="#streams-developer-guide-required-configs"><spanclass="std std-ref">parameters</span></a>. For example:</p>
<spanid="streams-developer-guide-required-configs"></span><h2>Configuration parameter reference<aclass="headerlink"href="#configuration-parameter-reference"title="Permalink to this headline"></a></h2>
<p>This section contains the most common Streams configuration parameters. For a full reference, see the <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/streams/StreamsConfig.html">Streams</a> Javadocs.</p>
<h3><aclass="toc-backref"href="#id3">Required configuration parameters</a><aclass="headerlink"href="#required-configuration-parameters"title="Permalink to this headline"></a></h3>
<p>Here are the required Streams configuration parameters.</p>
<tdcolspan="2">An identifier for the stream processing application. Must be unique within the Kafka cluster.</td>
<td>None</td>
</tr>
<trclass="row-odd"><td>bootstrap.servers</td>
<td>Required</td>
<tdcolspan="2">A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.</td>
<td>None</td>
</tr>
</tbody>
</table>
<divclass="section"id="application-id">
<h4><aclass="toc-backref"href="#id4">application.id</a><aclass="headerlink"href="#application-id"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>(Required) The application ID. Each stream processing application must have a unique ID. The same ID must be given to
all instances of the application. It is recommended to use only alphanumeric characters, <codeclass="docutils literal"><spanclass="pre">.</span></code> (dot), <codeclass="docutils literal"><spanclass="pre">-</span></code> (hyphen), and <codeclass="docutils literal"><spanclass="pre">_</span></code> (underscore). Examples: <codeclass="docutils literal"><spanclass="pre">"hello_world"</span></code>, <codeclass="docutils literal"><spanclass="pre">"hello_world-v1.0.0"</span></code></p>
<p>This ID is used in the following places to isolate resources used by the application from others:</p>
<ulclass="simple">
<li>As the default Kafka consumer and producer <codeclass="docutils literal"><spanclass="pre">client.id</span></code> prefix</li>
<li>As the Kafka consumer <codeclass="docutils literal"><spanclass="pre">group.id</span></code> for coordination</li>
<li>As the name of the subdirectory in the state directory (cf. <codeclass="docutils literal"><spanclass="pre">state.dir</span></code>)</li>
<li>As the prefix of internal Kafka topic names</li>
</ul>
<dlclass="docutils">
<dt>Tip:</dt>
<dd>When an application is updated, the <codeclass="docutils literal"><spanclass="pre">application.id</span></code> should be changed unless you want to reuse the existing data in internal topics and state stores.
For example, you could embed the version information within <codeclass="docutils literal"><spanclass="pre">application.id</span></code>, as <codeclass="docutils literal"><spanclass="pre">my-app-v1.0.0</span></code> and <codeclass="docutils literal"><spanclass="pre">my-app-v1.0.2</span></code>.</dd>
</dl>
</div></blockquote>
</div>
<divclass="section"id="bootstrap-servers">
<h4><aclass="toc-backref"href="#id5">bootstrap.servers</a><aclass="headerlink"href="#bootstrap-servers"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>(Required) The Kafka bootstrap servers. This is the same <aclass="reference external"href="http://kafka.apache.org/documentation.html#producerconfigs">setting</a> that is used by the underlying producer and consumer clients to connect to the Kafka cluster.
<h3><aclass="toc-backref"href="#id21">Recommended configuration parameters for resiliency</a><aclass="headerlink"href="#recommended-configuration-parameters-for-resiliency"title="Permalink to this headline"></a></h3>
<p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>
<p>Increasing the replication factor to 3 ensures that the internal Kafka Streams topic can tolerate up to 2 broker failures. Changing the acks setting to “all”
guarantees that a record will not be lost as long as one replica is alive. The tradeoff from moving to the default values to the recommended ones is
that some performance and more storage space (3x with the replication factor of 3) are sacrificed for more resiliency.</p>
<divclass="section"id="acks">
<h4><aclass="toc-backref"href="#id22">acks</a><aclass="headerlink"href="#acks"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The number of acknowledgments that the leader must have received before considering a request complete. This controls
the durability of records that are sent. The possible values are:</p>
<ulclass="simple">
<li><codeclass="docutils literal"><spanclass="pre">acks=0</span></code> The producer does not wait for acknowledgment from the server and the record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the <codeclass="docutils literal"><spanclass="pre">retries</span></code> configuration will not take effect (as the client won’t generally know of any failures). The offset returned for each record will always be set to <codeclass="docutils literal"><spanclass="pre">-1</span></code>.</li>
<li><codeclass="docutils literal"><spanclass="pre">acks=1</span></code> The leader writes the record to its local log and responds without waiting for full acknowledgement from all followers. If the leader immediately fails after acknowledging the record, but before the followers have replicated it, then the record will be lost.</li>
<li><codeclass="docutils literal"><spanclass="pre">acks=all</span></code> The leader waits for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost if there is at least one in-sync replica alive. This is the strongest available guarantee.</li>
</ul>
<p>For more information, see the <aclass="reference external"href="https://kafka.apache.org/documentation/#producerconfigs">Kafka Producer documentation</a>.</p>
</div></blockquote>
</div>
<divclass="section"id="id2">
<h4><aclass="toc-backref"href="#id23">replication.factor</a><aclass="headerlink"href="#id2"title="Permalink to this headline"></a></h4>
<blockquote>
<div>See the <aclass="reference internal"href="#replication-factor-parm"><spanclass="std std-ref">description here</span></a>.</div></blockquote>
</div>
<divclass="section"id="i32">
<h4><aclass="toc-backref"href="#id23">num.standby.replicas</a><aclass="headerlink"href="#id2"title="Permalink to this headline"></a></h4>
<blockquote>
<div>See the <aclass="reference internal"href="#streams-developer-guide-standby-replicas"><spanclass="std std-ref">description here</span></a>.</div></blockquote>
</div>
<preclass="line-numbers"><codeclass="language-java">Properties streamsSettings = new Properties();
<spanid="streams-developer-guide-optional-configs"></span><h3><aclass="toc-backref"href="#id6">Optional configuration parameters</a><aclass="headerlink"href="#optional-configuration-parameters"title="Permalink to this headline"></a></h3>
<p>Here are the optional <ahref="/{{version}}/javadoc/org/apache/kafka/streams/StreamsConfig.html">Streams</a> javadocs, sorted by level of importance:</p>
<li>High: These parameters can have a significant impact on performance. Take care when deciding the values of these parameters.</li>
<li>Medium: These parameters can have some impact on performance. Your specific environment will determine how much tuning effort should be focused on these parameters.</li>
<li>Low: These parameters have a less general or less significant impact on performance.</li>
<tdcolspan="2">The maximum acceptable lag (number of offsets to catch up) for an instance to be considered caught-up and ready for the active task.</td>
<tdcolspan="2">Exception handling class that implements the <codeclass="docutils literal"><spanclass="pre">DeserializationExceptionHandler</span></code> interface.</td>
<tdcolspan="2">Default serializer/deserializer class for record keys, implements the <codeclass="docutils literal"><spanclass="pre">Serde</span></code> interface. Must be
set by the user or all serdes must be passed in explicitly (see also default.value.serde).</td>
<tdcolspan="2">Exception handling class that implements the <codeclass="docutils literal"><spanclass="pre">ProductionExceptionHandler</span></code> interface.</td>
<tdcolspan="2">Timestamp extractor class that implements the <codeclass="docutils literal"><spanclass="pre">TimestampExtractor</span></code> interface.</td>
<tdcolspan="2">Default serializer/deserializer class for record values, implements the <codeclass="docutils literal"><spanclass="pre">Serde</span></code> interface. Must be
set by the user or all serdes must be passed in explicitly (see also default.key.serde).</td>
<tdcolspan="2">Default serializer/deserializer for the inner class of windowed keys, implementing the <codeclass="docutils literal"><spanclass="pre">Serde</span></code> interface.</td>
<tdcolspan="2">Default serializer/deserializer for the inner class of windowed values, implementing the <codeclass="docutils literal"><spanclass="pre">Serde</span></code> interface.</td>
<tdcolspan="2">The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td>
<tdcolspan="2">The processing mode. Can be either <codeclass="docutils literal"><spanclass="pre">"at_least_once"</span></code> (default)
or <codeclass="docutils literal"><spanclass="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+). Deprecated config options are
<codeclass="docutils literal"><spanclass="pre">"exactly_once"</span></code> (for EOS version 1) and <codeclass="docutils literal"><spanclass="pre">"exactly_once_beta"</span></code> (for EOS version 2, requires broker version 2.5+)</td>.
<tdcolspan="2">The amount of time in milliseconds, before a request is retried. This applies if the <codeclass="docutils literal"><spanclass="pre">retries</span></code> parameter is configured to be greater than 0. </td>
<tdcolspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td>
<tdcolspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: (<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>)). </td>
<h4><aclass="toc-backref"href="#id27">acceptable.recovery.lag</a><aclass="headerlink"href="#acceptable-recovery-lag"title="Permalink to this headline"></a></h4>
The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams will only assign
stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assign warmup replicas to restore state in the background for instances
that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.
Note: if you set this to <code>Long.MAX_VALUE</code> it effectively disables the warmup replicas and task high availability, allowing Streams to immediately produce a balanced
assignment and migrate tasks to a new instance without first warming them up.
<spanid="streams-developer-guide-deh"></span><h4><aclass="toc-backref"href="#id7">default.deserialization.exception.handler</a><aclass="headerlink"href="#default-deserialization-exception-handler"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This
<p>You can also provide your own customized exception handler besides the library provided ones to meet your needs. For example, you can choose to forward corrupt
records into a quarantine topic (think: a "dead letter queue") for further processing. To do this, use the Producer API to write a corrupted record directly to
the quarantine topic. To be more concrete, you can create a separate <code>KafkaProducer</code> object outside the Streams client, and pass in this object
as well as the dead letter queue topic name into the <code>Properties</code> map, which then can be retrieved from the <code>configure</code> function call.
The drawback of this approach is that "manual" writes are side effects that are invisible to the Kafka Streams runtime library,
so they do not benefit from the end-to-end processing guarantees of the Streams API:</p>
<spanid="streams-developer-guide-peh"></span><h4><aclass="toc-backref"href="#id24">default.production.exception.handler</a><aclass="headerlink"href="#default-production-exception-handler"title="Permalink to this headline"></a></h4>
such as attempting to produce a record that is too large. By default, Kafka provides and uses the <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a>
<p>Each exception handler can return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams
should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p>
<spanid="streams-developer-guide-timestamp-extractor"></span><h4><aclass="toc-backref"href="#id15">default.timestamp.extractor</a><aclass="headerlink"href="#timestamp-extractor"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>A timestamp extractor pulls a timestamp from an instance of <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html">ConsumerRecord</a>.
Timestamps are used to control the progress of streams.</p>
This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer
client since
<aclass="reference external"href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka version 0.10</a>.
Depending on the setting of Kafka’s server-side <codeclass="docutils literal"><spanclass="pre">log.message.timestamp.type</span></code> broker and <codeclass="docutils literal"><spanclass="pre">message.timestamp.type</span></code> topic parameters,
this extractor provides you with:</p>
<ulclass="simple">
<li><strong>event-time</strong> processing semantics if <codeclass="docutils literal"><spanclass="pre">log.message.timestamp.type</span></code> is set to <codeclass="docutils literal"><spanclass="pre">CreateTime</span></code> aka “producer time”
(which is the default). This represents the time when a Kafka producer sent the original message. If you use Kafka’s
official producer client, the timestamp represents milliseconds since the epoch.</li>
<li><strong>ingestion-time</strong> processing semantics if <codeclass="docutils literal"><spanclass="pre">log.message.timestamp.type</span></code> is set to <codeclass="docutils literal"><spanclass="pre">LogAppendTime</span></code> aka “broker
time”. This represents the time when the Kafka broker received the original message, in milliseconds since the epoch.</li>
</ul>
<p>The <codeclass="docutils literal"><spanclass="pre">FailOnInvalidTimestamp</span></code> extractor throws an exception if a record contains an invalid (i.e. negative) built-in
timestamp, because Kafka Streams would not process this record but silently drop it. Invalid built-in timestamps can
occur for various reasons: if for example, you consume a topic that is written to by pre-0.10 Kafka producer clients
or by third-party producer clients that don’t support the new Kafka 0.10 message format yet; another situation where
this may happen is after upgrading your Kafka cluster from <codeclass="docutils literal"><spanclass="pre">0.9</span></code> to <codeclass="docutils literal"><spanclass="pre">0.10</span></code>, where all the data that was generated
with <codeclass="docutils literal"><spanclass="pre">0.9</span></code> does not include the <codeclass="docutils literal"><spanclass="pre">0.10</span></code> message timestamps.</p>
<p>If you have data with invalid timestamps and want to process it, then there are two alternative extractors available.
Both work on built-in timestamps, but handle invalid timestamps differently.</p>
This extractor does not actually “extract” a timestamp from the consumed record but rather returns the current time in
milliseconds from the system clock (think: <codeclass="docutils literal"><spanclass="pre">System.currentTimeMillis()</span></code>), which effectively means Streams will operate
on the basis of the so-called <strong>processing-time</strong> of events.</p>
<p>You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of
messages. If you cannot extract a valid timestamp, you can either throw an exception, return a negative timestamp, or
estimate a timestamp. Returning a negative timestamp will result in data loss – the corresponding record will not be
processed but silently dropped. If you want to estimate a new timestamp, you can use the value provided via
<codeclass="docutils literal"><spanclass="pre">previousTimestamp</span></code> (i.e., a Kafka Streams timestamp estimation). Here is an example of a custom
whenever data needs to be materialized, for example:</p>
<div><ulclass="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code> and <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
</ul>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
<h4><aclass="toc-backref"href="#id9">default.value.serde</a><aclass="headerlink"href="#default-value-serde"title="Permalink to this headline"></a></h4>
happens whenever data needs to be materialized, for example:</p>
<ulclass="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code> and <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
<h4><aclass="toc-backref"href="#id32">default.windowed.key.serde.inner</a><aclass="headerlink"href="#default-windowed-key-serde-inner"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The default Serializer/Deserializer class for the inner class of windowed keys. Serialization and deserialization in Kafka Streams happens
whenever data needs to be materialized, for example:</p>
<div><ulclass="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code> and <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
<h4><aclass="toc-backref"href="#id33">default.windowed.value.serde.inner</a><aclass="headerlink"href="#default-windowed-value-serde-inner"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The default Serializer/Deserializer class for the inner class of windowed values. Serialization and deserialization in Kafka Streams happens
happens whenever data needs to be materialized, for example:</p>
<ulclass="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code> and <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
</ul>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
<h4><aclass="toc-backref"href="#id37">rack.aware.assignment.non_overlap_cost</a><aclass="headerlink"href="#rack-aware-assignment-non-overlap-cost"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
This configuration sets the cost of moving a task from the original assignment computed either by <codeclass="docutils literal"><spanclass="pre">StickyTaskAssignor</span></code> or
<codeclass="docutils literal"><spanclass="pre">HighAvailabilityTaskAssignor</span></code>. Together with <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code>,
they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code>,
the optimizer will try to maintain the existing assignment computed by the task assignor. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting
<codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code> to 10 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> to 1 is more likely to maintain existing assignment than setting
<codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code> to 100 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> to 50.
</p>
<p>
The default value is null which means default <codeclass="docutils literal"><spanclass="pre">non_overlap_cost</span></code> in different assignors will be used. In <codeclass="docutils literal"><spanclass="pre">StickyTaskAssignor</span></code>, it has a default value of 10 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> has
a default value of 1, which means maintaining stickiness is preferred in <codeclass="docutils literal"><spanclass="pre">StickyTaskAssignor</span></code>. In <codeclass="docutils literal"><spanclass="pre">HighAvailabilityTaskAssignor</span></code>, it has a default value of 1 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> has
a default value of 10, which means minimizing cross rack traffic is preferred in <codeclass="docutils literal"><spanclass="pre">HighAvailabilityTaskAssignor</span></code>.
<h4><aclass="toc-backref"href="#id35">rack.aware.assignment.strategy</a><aclass="headerlink"href="#rack-aware-assignment-strategy"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
This configuration sets the strategy Kafka Streams uses for rack aware task assignment so that cross traffic from broker to client can be reduced. This config will only take effect when <codeclass="docutils literal"><spanclass="pre">broker.rack</span></code>
is set on the brokers and <codeclass="docutils literal"><spanclass="pre">client.rack</span></code> is set on Kafka Streams side. There are two settings for this config:
</p>
<ulclass="simple">
<li><codeclass="docutils literal"><spanclass="pre">none</span></code>. This is the default value which means rack aware task assignment will be disabled.</li>
<li><codeclass="docutils literal"><spanclass="pre">min_traffic</span></code>. This settings means that the rack aware task assigner will compute an assignment which tries to minimize cross rack traffic.</li>
<li><codeclass="docutils literal"><spanclass="pre">balance_subtopology</span></code>. This settings means that the rack aware task assigner will compute an assignment which will try to balance tasks from same subtopology to different clients and minimize cross rack traffic on top of that.</li>
This config can be used together with <aclass="reference internal"href="#rack-aware-assignment-non-overlap-cost">rack.aware.assignment.non_overlap_cost</a> and
<aclass="reference internal"href="#rack-aware-assignment-traffic-cost">rack.aware.assignment.traffic_cost</a> to balance reducing cross rack traffic and maintaining the existing assignment.
<h4><aclass="toc-backref"href="#id34">rack.aware.assignment.tags</a><aclass="headerlink"href="#rack-aware-assignment-tags"title="Permalink to this headline"></a>
</h4>
<blockquote>
<div>
<p>
This configuration sets a list of tag keys used to distribute standby replicas across Kafka Streams
clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over
clients with different tag values.
</p>
<p>
Tags for the Kafka Streams clients can be set via <codeclass="docutils literal"><spanclass="pre">client.tag.</span></code>
In the above example, we have four Kafka Streams clients across two zones (<codeclass="docutils literal"><spanclass="pre">eu-central-1a</span></code>, <codeclass="docutils literal"><spanclass="pre">eu-central-1b</span></code>) and across two clusters (<codeclass="docutils literal"><spanclass="pre">k8s-cluster1</span></code>, <codeclass="docutils literal"><spanclass="pre">k8s-cluster2</span></code>).
For an active task located on <codeclass="docutils literal"><spanclass="pre">Client-1</span></code>, Kafka Streams will allocate a standby task on <codeclass="docutils literal"><spanclass="pre">Client-4</span></code>, since <codeclass="docutils literal"><spanclass="pre">Client-4</span></code> has a different <codeclass="docutils literal"><spanclass="pre">zone</span></code> and a different <codeclass="docutils literal"><spanclass="pre">cluster</span></code> than <codeclass="docutils literal"><spanclass="pre">Client-1</span></code>.
<h4><aclass="toc-backref"href="#id36">rack.aware.assignment.traffic_cost</a><aclass="headerlink"href="#rack-aware-assignment-traffic-cost"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
This configuration sets the cost of cross rack traffic. Together with <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code>,
they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code>,
the optimizer will try to compute an assignment which minimize the cross rack traffic. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting
<codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> to 10 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code> to 1 is more likely to minimize cross rack traffic than setting
<codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> to 100 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code> to 50.
</p>
<p>
The default value is null which means default traffic cost in different assignors will be used. In <codeclass="docutils literal"><spanclass="pre">StickyTaskAssignor</span></code>, it has a default value of 1 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code>
has a default value of 10. In <codeclass="docutils literal"><spanclass="pre">HighAvailabilityTaskAssignor</span></code>, it has a default value of 10 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code> has a default value of 1.
<spanid="streams-developer-guide-max-task-idle-ms"></span><h4><aclass="toc-backref"href="#id28">max.task.idle.ms</a><aclass="headerlink"href="#max-task-idle-ms"title="Permalink to this headline"></a></h4>
<spanid="streams-developer-guide-max-warmup-replicas"></span><h4><aclass="toc-backref"href="#id29">max.warmup.replicas</a><aclass="headerlink"href="#max-warmup-replicas"title="Permalink to this headline"></a></h4>
The maximum number of warmup replicas (extra standbys beyond the configured <codeclass="docutils literal"><spanclass="pre">num.standbys</span></code>) that can be assigned at once for the purpose of keeping
the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker
traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time
for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1.
</p>
<p>
Note that one warmup replica corresponds to one <ahref="https://kafka.apache.org/34/documentation/streams/architecture#streams_architecture_tasks">Stream Task</a>. Furthermore, note that each warmup task can only be promoted to an active task during
a rebalance (normally during a so-called probing rebalance, which occur at a frequency specified by the
<codeclass="docutils literal"><spanclass="pre">probing.rebalance.interval.ms</span></code> config). This means that the
maximum rate at which active tasks can be migrated from one Kafka Streams instance to another instance can be determined by
<spanid="streams-developer-guide-standby-replicas"></span><h4><aclass="toc-backref"href="#id10">num.standby.replicas</a><aclass="headerlink"href="#num-standby-replicas"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The number of standby replicas. Standby replicas are shadow copies of local state stores. Kafka Streams attempts to create the
resuming tasks on failover can be found in the <aclass="reference internal"href="../architecture.html#streams_architecture_state"><spanclass="std std-ref">State</span></a> section.
<dd>Increase the number of standbys to 1 to get instant fail-over, i.e., high-availability.
Increasing the number of standbys requires more client-side storage space.
For example, with 1 standby, 2x space is required.</dd>
</dl>
<dlclass="docutils">
<dt>Note:</dt>
<dd>If you enable <cite>n</cite> standby tasks, you need to provision <cite>n+1</cite><codeclass="docutils literal"><spanclass="pre">KafkaStreams</span></code> instances.</dd>
For more information about Kafka Streams threading model, see <aclass="reference internal"href="../architecture.html#streams_architecture_threads"><spanclass="std std-ref">Threading Model</span></a>.</div></blockquote>
<h4><aclass="toc-backref"href="#id30">probing.rebalance.interval.ms</a><aclass="headerlink"href="#probing-rebalance-interval-ms"title="Permalink to this headline"></a></h4>
The maximum time to wait before triggering a rebalance to probe for warmup replicas that have restored enough to be considered caught up. Streams will only assign stateful active tasks to
instances that are caught up and within the <aclass="reference internal"href="#acceptable-recovery-lag"><spanclass="std std-ref">acceptable.recovery.lag</span></a>, if any exist. Probing rebalances are used to query the latest total lag of warmup replicas and transition
them to active tasks if ready. They will continue to be triggered as long as there are warmup tasks, and until the assignment is balanced. Must be at least 1 minute.
<spanid="streams-developer-guide-processing-guarantee"></span><h4><aclass="toc-backref"href="#id25">processing.guarantee</a><aclass="headerlink"href="#processing-guarantee"title="Permalink to this headline"></a></h4>
<spanid="replication-factor-parm"></span><h4><aclass="toc-backref"href="#id13">replication.factor</a><aclass="headerlink"href="#replication-factor"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>This specifies the replication factor of internal topics that Kafka Streams creates when local states are used or a stream is
repartitioned for aggregation. Replication is important for fault tolerance. Without replication even a single broker failure
may prevent progress of the stream processing application. It is recommended to use a similar replication factor as source topics.</p>
<dlclass="docutils">
<dt>Recommendation:</dt>
<dd>Increase the replication factor to 3 to ensure that the internal Kafka Streams topic can tolerate up to 2 broker failures.
<spanid="streams-developer-guide-rocksdb-config"></span><h4><aclass="toc-backref"href="#id20">rocksdb.config.setter</a><aclass="headerlink"href="#rocksdb-config-setter"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default
configuration for RocksDB, you can implement <codeclass="docutils literal"><spanclass="pre">RocksDBConfigSetter</span></code> and provide your custom class via <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p>
<p>Here is an example that adjusts the memory size consumed by RocksDB.</p>
<li><codeclass="docutils literal"><spanclass="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <codeclass="docutils literal"><spanclass="pre">BloomFilter</span></code>, which is an important optimization.
<li><codeclass="docutils literal"><spanclass="pre">tableConfig.setBlockSize(16</span><spanclass="pre">*</span><spanclass="pre">1024L);</span></code> Modify the default <aclass="reference external"href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <aclass="reference external"href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li>
<h4><aclass="toc-backref"href="#id14">state.dir</a><aclass="headerlink"href="#state-dir"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting
machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated
with the application are created under this subdirectory. When running multiple instances of the same application on a single machine,
this path must be unique for each such instance.</div>
</blockquote>
</div>
<divclass="section"id="topology-optimization">
<h4><aclass="toc-backref"href="#id31">topology.optimization</a><aclass="headerlink"href="#topology-optimization"title="Permalink to this headline"></a></h4>
A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: (<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>)).
</p>
<p>
We recommend listing specific optimizations in the config for production code so that the structure of your topology will not change unexpectedly during upgrades of the Streams library.
</p>
<p>
These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. These optimizations will save on network traffic and storage in Kafka without changing the semantics of your applications. Enabling them is recommended.
Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
configuration properties when building your topology by using the overloaded <code>StreamsBuilder.build(Properties)</code> method.
For example <code>KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)</code>.
<spanid="streams-developer-guide-upgrade-from"></span><h4><aclass="toc-backref"href="#id14">upgrade.from</a><aclass="headerlink"href="#upgrade-from"title="Permalink to this headline"></a></h4>
The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide.
You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the
newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path
when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4.
<h3><aclass="toc-backref"href="#id16">Kafka consumers, producer and admin client configuration parameters</a><aclass="headerlink"href="#kafka-consumers-and-producer-configuration-parameters"title="Permalink to this headline"></a></h3>
<p>You can specify parameters for the Kafka <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/package-summary.html">consumers</a>, <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/producer/package-summary.html">producers</a>,
and <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html">admin client</a> that are used internally.
The consumer, producer and admin client settings are defined by specifying parameters in a <codeclass="docutils literal"><spanclass="pre">StreamsConfig</span></code> instance.</p>
<p>In this example, the Kafka <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer session timeout</a> is configured to be 60000 milliseconds in the Streams settings:</p>
<h4><aclass="toc-backref"href="#id17">Naming</a><aclass="headerlink"href="#naming"title="Permalink to this headline"></a></h4>
<p>Some consumer, producer and admin client configuration parameters use the same parameter name, and Kafka Streams library itself also uses some parameters that share the same name with its embedded client. For example, <codeclass="docutils literal"><spanclass="pre">send.buffer.bytes</span></code> and
<codeclass="docutils literal"><spanclass="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <codeclass="docutils literal"><spanclass="pre">request.timeout.ms</span></code> and <codeclass="docutils literal"><spanclass="pre">retry.backoff.ms</span></code> control retries for client request;
<codeclass="docutils literal"><spanclass="pre">retries</span></code> are used to configure how many retries are allowed when handling retriable errors from broker request responses.
You can avoid duplicate names by prefix parameter names with <codeclass="docutils literal"><spanclass="pre">consumer.</span></code>, <codeclass="docutils literal"><spanclass="pre">producer.</span></code>, or <codeclass="docutils literal"><spanclass="pre">admin.</span></code> (e.g., <codeclass="docutils literal"><spanclass="pre">consumer.send.buffer.bytes</span></code> and <codeclass="docutils literal"><spanclass="pre">producer.send.buffer.bytes</span></code>).</p>
<p>You could further separate consumer configuration by adding different prefixes:</p>
<ulclass="simple">
<li><codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code> for main consumer which is the default consumer of stream source.</li>
<li><codeclass="docutils literal"><spanclass="pre">restore.consumer.</span></code> for restore consumer which is in charge of state store recovery.</li>
<li><codeclass="docutils literal"><spanclass="pre">global.consumer.</span></code> for global consumer which is used in global KTable construction.</li>
</ul>
<p>For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use <codeclass="docutils literal"><spanclass="pre">restore.consumer.</span></code> to set the config.</p>
<p> Same applied to <codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code> and <codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code>, if you only want to specify one consumer type config.</p>
<p> Additionally, to configure the internal repartition/changelog topics, you could use the <codeclass="docutils literal"><spanclass="pre">topic.</span></code> prefix, followed by any of the standard topic configs.</p>
<h3><aclass="toc-backref"href="#id26">Parameters controlled by Kafka Streams</a><aclass="headerlink"href="#parameters-controlled-by-kafka-streams"title="Permalink to this headline"></a></h3>
<strong>For Non-EOS and EOS v2: </strong><codeclass="docutils literal"><spanclass="pre"><client.id>-StreamThread-<threadIdx>-producer</span></code>
</td>
</tr>
<tr>
<td>
<strong>For EOS v1: </strong><codeclass="docutils literal"><spanclass="pre"><client.id>-StreamThread-<threadIdx>-<taskId>-producer</span></code>
<spanid="streams-developer-guide-consumer-auto-commit"></span><h4><aclass="toc-backref"href="#id19">enable.auto.commit</a><aclass="headerlink"href="#enable-auto-commit"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config
value to <codeclass="docutils literal"><spanclass="pre">false</span></code>. Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides
to commit the current processing state.</div></blockquote>