<spanid="streams-developer-guide-configuration"></span><h1>Configuring a Streams Application<aclass="headerlink"href="#configuring-a-streams-application"title="Permalink to this headline"></a></h1>
<p>Kafka and Kafka Streams configuration options must be configured before using Streams. You can configure Kafka Streams by specifying parameters in a <codeclass="docutils literal"><spanclass="pre">java.util.Properties</span></code> instance.</p>
<li><pclass="first">Set the <aclass="reference internal"href="#streams-developer-guide-required-configs"><spanclass="std std-ref">parameters</span></a>. For example:</p>
<spanid="streams-developer-guide-required-configs"></span><h2>Configuration parameter reference<aclass="headerlink"href="#configuration-parameter-reference"title="Permalink to this headline"></a></h2>
<p>This section contains the most common Streams configuration parameters. For a full reference, see the <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/streams/StreamsConfig.html">Streams</a> Javadocs.</p>
<li><aclass="reference internal"href="#deserialization-exception-handler"id="id7">default.deserialization.exception.handler (deprecated since 4.0)</a></li>
<h3><aclass="toc-backref"href="#id3">Required configuration parameters</a><aclass="headerlink"href="#required-configuration-parameters"title="Permalink to this headline"></a></h3>
<p>Here are the required Streams configuration parameters.</p>
<tdcolspan="2">An identifier for the stream processing application. Must be unique within the Kafka cluster.</td>
<td>None</td>
</tr>
<trclass="row-odd"><td>bootstrap.servers</td>
<td>Required</td>
<tdcolspan="2">A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.</td>
<td>None</td>
</tr>
</tbody>
</table>
<divclass="section"id="application-id">
<h4><aclass="toc-backref"href="#id4">application.id</a><aclass="headerlink"href="#application-id"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>(Required) The application ID. Each stream processing application must have a unique ID. The same ID must be given to
all instances of the application. It is recommended to use only alphanumeric characters, <codeclass="docutils literal"><spanclass="pre">.</span></code> (dot), <codeclass="docutils literal"><spanclass="pre">-</span></code> (hyphen), and <codeclass="docutils literal"><spanclass="pre">_</span></code> (underscore). Examples: <codeclass="docutils literal"><spanclass="pre">"hello_world"</span></code>, <codeclass="docutils literal"><spanclass="pre">"hello_world-v1.0.0"</span></code></p>
<p>This ID is used in the following places to isolate resources used by the application from others:</p>
<ulclass="simple">
<li>As the default Kafka consumer and producer <codeclass="docutils literal"><spanclass="pre">client.id</span></code> prefix</li>
<li>As the Kafka consumer <codeclass="docutils literal"><spanclass="pre">group.id</span></code> for coordination</li>
<li>As the name of the subdirectory in the state directory (cf. <codeclass="docutils literal"><spanclass="pre">state.dir</span></code>)</li>
<li>As the prefix of internal Kafka topic names</li>
</ul>
<dlclass="docutils">
<dt>Tip:</dt>
<dd>When an application is updated, the <codeclass="docutils literal"><spanclass="pre">application.id</span></code> should be changed unless you want to reuse the existing data in internal topics and state stores.
For example, you could embed the version information within <codeclass="docutils literal"><spanclass="pre">application.id</span></code>, as <codeclass="docutils literal"><spanclass="pre">my-app-v1.0.0</span></code> and <codeclass="docutils literal"><spanclass="pre">my-app-v1.0.2</span></code>.</dd>
</dl>
</div></blockquote>
</div>
<divclass="section"id="bootstrap-servers">
<h4><aclass="toc-backref"href="#id5">bootstrap.servers</a><aclass="headerlink"href="#bootstrap-servers"title="Permalink to this headline"></a></h4>
<div><p>(Required) The Kafka bootstrap servers. This is the same <aclass="reference external"href="/{{version}}/documentation.html#producerconfigs">setting</a> that is used by the underlying producer and consumer clients to connect to the Kafka cluster.
<h3><aclass="toc-backref"href="#id21">Recommended configuration parameters for resiliency</a><aclass="headerlink"href="#recommended-configuration-parameters-for-resiliency"title="Permalink to this headline"></a></h3>
<p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>
<li><codeclass="docutils literal"><spanclass="pre">acks="0"</span></code> The producer does not wait for acknowledgment from the server and the record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the producer won’t generally know of any failures. The offset returned for each record will always be set to <codeclass="docutils literal"><spanclass="pre">-1</span></code>.</li>
<li><codeclass="docutils literal"><spanclass="pre">acks="1"</span></code> The leader writes the record to its local log and responds without waiting for full acknowledgement from all followers. If the leader immediately fails after acknowledging the record, but before the followers have replicated it, then the record will be lost.</li>
<li><codeclass="docutils literal"><spanclass="pre">acks="all"</span></code> (default since 3.0 release) The leader waits for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost if there is at least one in-sync replica alive. This is the strongest available guarantee.</li>
<p>For more information, see the <aclass="reference external"href="https://kafka.apache.org/documentation/#producerconfigs">Kafka Producer documentation</a>.</p>
<spanid="streams-developer-guide-optional-configs"></span><h3><aclass="toc-backref"href="#id6">Optional configuration parameters</a><aclass="headerlink"href="#optional-configuration-parameters"title="Permalink to this headline"></a></h3>
<p>Here are the optional <ahref="/{{version}}/javadoc/org/apache/kafka/streams/StreamsConfig.html">Streams</a> javadocs, sorted by level of importance:</p>
<li>High: These are parameters with a default value which is most likely not a good fit for production use. It's highly recommended to revisit these parameters for production usage.</li>
<li>Medium: The default values of these parameters should work for production for many cases, but it's not uncommon that they are changed, for example to tune performance.</li>
<li>Low: It should rarely be necessary to change the value for these parameters. It's only recommended to change them if there is a very specific issue you want to address.</li>
<tdcolspan="2">The maximum acceptable lag (number of offsets to catch up) for an instance to be considered caught-up and ready for the active task.</td>
<tdcolspan="2">Exception handling class that implements the <codeclass="docutils literal"><spanclass="pre">DeserializationExceptionHandler</span></code> interface.</td>
<tdcolspan="2">Default serializer/deserializer class for record keys, implements the <codeclass="docutils literal"><spanclass="pre">Serde</span></code> interface. Must be
set by the user or all serdes must be passed in explicitly (see also default.value.serde).</td>
<tdcolspan="2">Exception handling class that implements the <codeclass="docutils literal"><spanclass="pre">ProductionExceptionHandler</span></code> interface.</td>
<tdcolspan="2">Timestamp extractor class that implements the <codeclass="docutils literal"><spanclass="pre">TimestampExtractor</span></code> interface.
See <aclass="reference internal"href="#streams-developer-guide-timestamp-extractor"><spanclass="std std-ref">Timestamp Extractor</span></a></td>
<tdcolspan="2">Default serializer/deserializer class for record values, implements the <codeclass="docutils literal"><spanclass="pre">Serde</span></code> interface. Must be
set by the user or all serdes must be passed in explicitly (see also default.key.serde).</td>
<tdcolspan="2">Exception handling class that implements the <codeclass="docutils literal"><spanclass="pre">DeserializationExceptionHandler</span></code> interface.</td>
<tdcolspan="2">Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client.</td>
<tdcolspan="2">The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td>
<tdcolspan="2">Exception handling class that implements the <codeclass="docutils literal"><spanclass="pre">ProcessingExceptionHandler</span></code> interface.</td>
<tdcolspan="2">Exception handling class that implements the <codeclass="docutils literal"><spanclass="pre">ProductionExceptionHandler</span></code> interface.</td>
<tdcolspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td>
<tdcolspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: <code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>),
<h4><aclass="toc-backref"href="#id27">acceptable.recovery.lag</a><aclass="headerlink"href="#acceptable-recovery-lag"title="Permalink to this headline"></a></h4>
The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams will only assign
stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assign warmup replicas to restore state in the background for instances
that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.
Note: if you set this to <code>Long.MAX_VALUE</code> it effectively disables the warmup replicas and task high availability, allowing Streams to immediately produce a balanced
assignment and migrate tasks to a new instance without first warming them up.
<spanid="streams-developer-guide-deh"></span><h4><aclass="toc-backref"href="#id7">deserialization.exception.handler (deprecated: default.deserialization.exception.handler)</a><aclass="headerlink"href="#deserialization-exception-handler"title="Permalink to this headline"></a></h4>
<p>You can also provide your own customized exception handler besides the library provided ones to meet your needs. For example, you can choose to forward corrupt
records into a quarantine topic (think: a "dead letter queue") for further processing. To do this, use the Producer API to write a corrupted record directly to
the quarantine topic. To be more concrete, you can create a separate <code>KafkaProducer</code> object outside the Streams client, and pass in this object
as well as the dead letter queue topic name into the <code>Properties</code> map, which then can be retrieved from the <code>configure</code> function call.
The drawback of this approach is that "manual" writes are side effects that are invisible to the Kafka Streams runtime library,
so they do not benefit from the end-to-end processing guarantees of the Streams API:</p>
<spanid="streams-developer-guide-peh"></span><h4><aclass="toc-backref"href="#id24">production.exception.handler (deprecated: default.production.exception.handler)</a><aclass="headerlink"href="#production-exception-handler"title="Permalink to this headline"></a></h4>
such as attempting to produce a record that is too large. By default, Kafka provides and uses the <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a>
<p>An exception handler can return <code>FAIL</code>, <code>CONTINUE</code>, or <code>RETRY</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down. <code>CONTINUE</code> will signal that Streams
should ignore the issue and continue processing. For <code>RetriableException</code> the handler may return <code>RETRY</code> to tell the runtime to retry sending the failed record (<b>Note:</b> If <code>RETRY</code> is returned for a non-<code>RetriableException</code>
it will be treated as <code>FAIL</code>.) If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p>
<spanid="streams-developer-guide-timestamp-extractor"></span><h4><aclass="toc-backref"href="#id15">default.timestamp.extractor</a><aclass="headerlink"href="#timestamp-extractor"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>A timestamp extractor pulls a timestamp from an instance of <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html">ConsumerRecord</a>.
Timestamps are used to control the progress of streams.</p>
This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer
client since
<aclass="reference external"href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka version 0.10</a>.
Depending on the setting of Kafka’s server-side <codeclass="docutils literal"><spanclass="pre">log.message.timestamp.type</span></code> broker and <codeclass="docutils literal"><spanclass="pre">message.timestamp.type</span></code> topic parameters,
this extractor provides you with:</p>
<ulclass="simple">
<li><strong>event-time</strong> processing semantics if <codeclass="docutils literal"><spanclass="pre">log.message.timestamp.type</span></code> is set to <codeclass="docutils literal"><spanclass="pre">CreateTime</span></code> aka “producer time”
(which is the default). This represents the time when a Kafka producer sent the original message. If you use Kafka’s
official producer client, the timestamp represents milliseconds since the epoch.</li>
<li><strong>ingestion-time</strong> processing semantics if <codeclass="docutils literal"><spanclass="pre">log.message.timestamp.type</span></code> is set to <codeclass="docutils literal"><spanclass="pre">LogAppendTime</span></code> aka “broker
time”. This represents the time when the Kafka broker received the original message, in milliseconds since the epoch.</li>
</ul>
<p>The <codeclass="docutils literal"><spanclass="pre">FailOnInvalidTimestamp</span></code> extractor throws an exception if a record contains an invalid (i.e. negative) built-in
timestamp, because Kafka Streams would not process this record but silently drop it. Invalid built-in timestamps can
occur for various reasons: if for example, you consume a topic that is written to by pre-0.10 Kafka producer clients
or by third-party producer clients that don’t support the new Kafka 0.10 message format yet; another situation where
this may happen is after upgrading your Kafka cluster from <codeclass="docutils literal"><spanclass="pre">0.9</span></code> to <codeclass="docutils literal"><spanclass="pre">0.10</span></code>, where all the data that was generated
with <codeclass="docutils literal"><spanclass="pre">0.9</span></code> does not include the <codeclass="docutils literal"><spanclass="pre">0.10</span></code> message timestamps.</p>
<p>If you have data with invalid timestamps and want to process it, then there are two alternative extractors available.
Both work on built-in timestamps, but handle invalid timestamps differently.</p>
This extractor does not actually “extract” a timestamp from the consumed record but rather returns the current time in
milliseconds from the system clock (think: <codeclass="docutils literal"><spanclass="pre">System.currentTimeMillis()</span></code>), which effectively means Streams will operate
on the basis of the so-called <strong>processing-time</strong> of events.</p>
<p>You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of
messages. If you cannot extract a valid timestamp, you can either throw an exception, return a negative timestamp, or
estimate a timestamp. Returning a negative timestamp will result in data loss – the corresponding record will not be
processed but silently dropped. If you want to estimate a new timestamp, you can use the value provided via
<codeclass="docutils literal"><spanclass="pre">previousTimestamp</span></code> (i.e., a Kafka Streams timestamp estimation). Here is an example of a custom
whenever data needs to be materialized, for example:</p>
<div><ulclass="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code> and <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
</ul>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
<h4><aclass="toc-backref"href="#id9">default.value.serde</a><aclass="headerlink"href="#default-value-serde"title="Permalink to this headline"></a></h4>
happens whenever data needs to be materialized, for example:</p>
<ulclass="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code> and <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
<h4><aclass="toc-backref"href="#id37">rack.aware.assignment.non_overlap_cost</a><aclass="headerlink"href="#rack-aware-assignment-non-overlap-cost"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
This configuration sets the cost of moving a task from the original assignment computed either by <codeclass="docutils literal"><spanclass="pre">StickyTaskAssignor</span></code> or
<codeclass="docutils literal"><spanclass="pre">HighAvailabilityTaskAssignor</span></code>. Together with <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code>,
they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code>,
the optimizer will try to maintain the existing assignment computed by the task assignor. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting
<codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code> to 10 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> to 1 is more likely to maintain existing assignment than setting
<codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code> to 100 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> to 50.
</p>
<p>
The default value is null which means default <codeclass="docutils literal"><spanclass="pre">non_overlap_cost</span></code> in different assignors will be used. In <codeclass="docutils literal"><spanclass="pre">StickyTaskAssignor</span></code>, it has a default value of 10 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> has
a default value of 1, which means maintaining stickiness is preferred in <codeclass="docutils literal"><spanclass="pre">StickyTaskAssignor</span></code>. In <codeclass="docutils literal"><spanclass="pre">HighAvailabilityTaskAssignor</span></code>, it has a default value of 1 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> has
a default value of 10, which means minimizing cross rack traffic is preferred in <codeclass="docutils literal"><spanclass="pre">HighAvailabilityTaskAssignor</span></code>.
<h4><aclass="toc-backref"href="#id35">rack.aware.assignment.strategy</a><aclass="headerlink"href="#rack-aware-assignment-strategy"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
This configuration sets the strategy Kafka Streams uses for rack aware task assignment so that cross traffic from broker to client can be reduced. This config will only take effect when <codeclass="docutils literal"><spanclass="pre">broker.rack</span></code>
is set on the brokers and <codeclass="docutils literal"><spanclass="pre">client.rack</span></code> is set on Kafka Streams side. There are two settings for this config:
</p>
<ulclass="simple">
<li><codeclass="docutils literal"><spanclass="pre">none</span></code>. This is the default value which means rack aware task assignment will be disabled.</li>
<li><codeclass="docutils literal"><spanclass="pre">min_traffic</span></code>. This settings means that the rack aware task assigner will compute an assignment which tries to minimize cross rack traffic.</li>
<li><codeclass="docutils literal"><spanclass="pre">balance_subtopology</span></code>. This settings means that the rack aware task assigner will compute an assignment which will try to balance tasks from same subtopology to different clients and minimize cross rack traffic on top of that.</li>
This config can be used together with <aclass="reference internal"href="#rack-aware-assignment-non-overlap-cost">rack.aware.assignment.non_overlap_cost</a> and
<aclass="reference internal"href="#rack-aware-assignment-traffic-cost">rack.aware.assignment.traffic_cost</a> to balance reducing cross rack traffic and maintaining the existing assignment.
<h4><aclass="toc-backref"href="#id34">rack.aware.assignment.tags</a><aclass="headerlink"href="#rack-aware-assignment-tags"title="Permalink to this headline"></a>
</h4>
<blockquote>
<div>
<p>
This configuration sets a list of tag keys used to distribute standby replicas across Kafka Streams
clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over
clients with different tag values.
</p>
<p>
Tags for the Kafka Streams clients can be set via <codeclass="docutils literal"><spanclass="pre">client.tag.</span></code>
In the above example, we have four Kafka Streams clients across two zones (<codeclass="docutils literal"><spanclass="pre">eu-central-1a</span></code>, <codeclass="docutils literal"><spanclass="pre">eu-central-1b</span></code>) and across two clusters (<codeclass="docutils literal"><spanclass="pre">k8s-cluster1</span></code>, <codeclass="docutils literal"><spanclass="pre">k8s-cluster2</span></code>).
For an active task located on <codeclass="docutils literal"><spanclass="pre">Client-1</span></code>, Kafka Streams will allocate a standby task on <codeclass="docutils literal"><spanclass="pre">Client-4</span></code>, since <codeclass="docutils literal"><spanclass="pre">Client-4</span></code> has a different <codeclass="docutils literal"><spanclass="pre">zone</span></code> and a different <codeclass="docutils literal"><spanclass="pre">cluster</span></code> than <codeclass="docutils literal"><spanclass="pre">Client-1</span></code>.
<h4><aclass="toc-backref"href="#id36">rack.aware.assignment.traffic_cost</a><aclass="headerlink"href="#rack-aware-assignment-traffic-cost"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
This configuration sets the cost of cross rack traffic. Together with <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code>,
they control whether the optimizer favors minimizing cross rack traffic or minimizing the movement of tasks in the existing assignment. If this config is set to a larger value than <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code>,
the optimizer will try to compute an assignment which minimize the cross rack traffic. Note that the optimizer takes the ratio of these two configs into consideration of favoring maintaining existing assignment or minimizing traffic cost. For example, setting
<codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> to 10 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code> to 1 is more likely to minimize cross rack traffic than setting
<codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.traffic_cost</span></code> to 100 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code> to 50.
</p>
<p>
The default value is null which means default traffic cost in different assignors will be used. In <codeclass="docutils literal"><spanclass="pre">StickyTaskAssignor</span></code>, it has a default value of 1 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code>
has a default value of 10. In <codeclass="docutils literal"><spanclass="pre">HighAvailabilityTaskAssignor</span></code>, it has a default value of 10 and <codeclass="docutils literal"><spanclass="pre">rack.aware.assignment.non_overlap_cost</span></code> has a default value of 1.
<h4><aclass="toc-backref"href="#id40">log.summary.interval.ms</a><aclass="headerlink"href="#log-summary-interval-ms"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
This configuration controls the output interval for summary information.
If greater or equal to 0, the summary log will be output according to the set time interval;
<spanid="streams-developer-guide-enable-metrics-push"></span><h4><aclass="toc-backref"href="#id43">enable.metrics.push</a><aclass="headerlink"href="#enable-metrics-push"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
Kafka Streams metrics can be pushed to the brokers similar to client metrics.
Additionally, Kafka Streams allows to enable/disable metric pushing for each embedded client individually.
However, pushing Kafka Streams metrics requires that <code>enable.metric.push</code> is enabled on the main-consumer and admin client.
<spanid="streams-developer-guide-max-task-idle-ms"></span><h4><aclass="toc-backref"href="#id28">max.task.idle.ms</a><aclass="headerlink"href="#max-task-idle-ms"title="Permalink to this headline"></a></h4>
<spanid="streams-developer-guide-max-warmup-replicas"></span><h4><aclass="toc-backref"href="#id29">max.warmup.replicas</a><aclass="headerlink"href="#max-warmup-replicas"title="Permalink to this headline"></a></h4>
The maximum number of warmup replicas (extra standbys beyond the configured <codeclass="docutils literal"><spanclass="pre">num.standbys</span></code>) that can be assigned at once for the purpose of keeping
the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker
traffic and cluster state can be used for high availability. Increasing this will allow Streams to warm up more tasks at once, speeding up the time
for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks. Must be at least 1.
Note that one warmup replica corresponds to one <ahref="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">Stream Task</a>. Furthermore, note that each warmup task can only be promoted to an active task during
<spanid="streams-developer-guide-standby-replicas"></span><h4><aclass="toc-backref"href="#id10">num.standby.replicas</a><aclass="headerlink"href="#num-standby-replicas"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The number of standby replicas. Standby replicas are shadow copies of local state stores. Kafka Streams attempts to create the
resuming tasks on failover can be found in the <aclass="reference internal"href="../architecture.html#streams_architecture_state"><spanclass="std std-ref">State</span></a> section.
<dd>Increase the number of standbys to 1 to get instant fail-over, i.e., high-availability.
Increasing the number of standbys requires more client-side storage space.
For example, with 1 standby, 2x space is required.</dd>
</dl>
<dlclass="docutils">
<dt>Note:</dt>
<dd>If you enable <cite>n</cite> standby tasks, you need to provision <cite>n+1</cite><codeclass="docutils literal"><spanclass="pre">KafkaStreams</span></code> instances.</dd>
For more information about Kafka Streams threading model, see <aclass="reference internal"href="../architecture.html#streams_architecture_threads"><spanclass="std std-ref">Threading Model</span></a>.</div></blockquote>
<h4><aclass="toc-backref"href="#id30">probing.rebalance.interval.ms</a><aclass="headerlink"href="#probing-rebalance-interval-ms"title="Permalink to this headline"></a></h4>
The maximum time to wait before triggering a rebalance to probe for warmup replicas that have restored enough to be considered caught up. Streams will only assign stateful active tasks to
instances that are caught up and within the <aclass="reference internal"href="#acceptable-recovery-lag"><spanclass="std std-ref">acceptable.recovery.lag</span></a>, if any exist. Probing rebalances are used to query the latest total lag of warmup replicas and transition
them to active tasks if ready. They will continue to be triggered as long as there are warmup tasks, and until the assignment is balanced. Must be at least 1 minute.
<spanid="streams-developer-guide-proceh"></span><h4><aclass="toc-backref"href="#id41">processing.exception.handler</a><aclass="headerlink"href="#processing-exception-handler"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The processing exception handler allows you to manage exceptions triggered during the processing of a record. The implemented exception
handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning
<code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue
and continue processing. The following library built-in exception handlers are available:</p>
This handler logs the processing exception and then signals the processing pipeline to stop processing more records.</li>
</ul>
<p>You can also provide your own customized exception handler besides the library provided ones to meet your needs. For example, you can choose to forward corrupt
records into a quarantine topic (think: a "dead letter queue") for further processing. To do this, use the Producer API to write a corrupted record directly to
the quarantine topic. To be more concrete, you can create a separate <code>KafkaProducer</code> object outside the Streams client, and pass in this object
as well as the dead letter queue topic name into the <code>Properties</code> map, which then can be retrieved from the <code>configure</code> function call.
The drawback of this approach is that "manual" writes are side effects that are invisible to the Kafka Streams runtime library,
so they do not benefit from the end-to-end processing guarantees of the Streams API:</p>
<preclass="line-numbers"><codeclass="language-java">public class SendToDeadLetterQueueExceptionHandler implements ProcessingExceptionHandler {
KafkaProducer<byte[], byte[]> dlqProducer;
String dlqTopic;
@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
final Record<?, ?> record,
final Exception exception) {
log.warn("Exception caught during message processing, sending to the dead queue topic; " +
<spanid="streams-developer-guide-processing-guarantee"></span><h4><aclass="toc-backref"href="#id25">processing.guarantee</a><aclass="headerlink"href="#processing-guarantee"title="Permalink to this headline"></a></h4>
<spanid="streams-developer-guide-processor-wrapper-class"></span><h4><aclass="toc-backref"href="#id42">processor.wrapper.class</a><aclass="headerlink"href="#processor-wrapper-class"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
A class or class name implementing the <codeclass="docutils literal"><spanclass="pre">ProcessorWrapper</span></code> interface. This feature allows you to wrap any of the
processors in the compiled topology, including both custom processor implementations and those created by Streams for DSL operators. This can be useful for logging or tracing
implementations since it allows access to the otherwise-hidden processor context for DSL operators, and also allows for injecting additional debugging information to an entire
application topology with just a single config.
</p>
<p>
IMPORTANT: This MUST be passed in when creating the topology, and will not be applied unless passed in to the appropriate topology-building constructor. You should
use the <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#new(TopologyConfig)</span></code> constructor for DSL applications, and the
<codeclass="docutils literal"><spanclass="pre">Topology#new(TopologyConfig)</span></code> constructor for PAPI applications.
<spanid="replication-factor-parm"></span><h4><aclass="toc-backref"href="#id13">replication.factor</a><aclass="headerlink"href="#replication-factor"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>This specifies the replication factor of internal topics that Kafka Streams creates when local states are used or a stream is
repartitioned for aggregation. Replication is important for fault tolerance. Without replication even a single broker failure
may prevent progress of the stream processing application. It is recommended to use a similar replication factor as source topics.</p>
<dlclass="docutils">
<dt>Recommendation:</dt>
<dd>Increase the replication factor to 3 to ensure that the internal Kafka Streams topic can tolerate up to 2 broker failures.
<spanid="streams-developer-guide-rocksdb-config"></span><h4><aclass="toc-backref"href="#id20">rocksdb.config.setter</a><aclass="headerlink"href="#rocksdb-config-setter"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default
configuration for RocksDB, you can implement <codeclass="docutils literal"><spanclass="pre">RocksDBConfigSetter</span></code> and provide your custom class via <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p>
<p>Here is an example that adjusts the memory size consumed by RocksDB.</p>
<li><codeclass="docutils literal"><spanclass="pre">BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();</span></code> Get a reference to the existing table config rather than create a new one, so you don't accidentally overwrite defaults such as the <codeclass="docutils literal"><spanclass="pre">BloomFilter</span></code>, which is an important optimization.
<li><codeclass="docutils literal"><spanclass="pre">tableConfig.setBlockSize(16</span><spanclass="pre">*</span><spanclass="pre">1024L);</span></code> Modify the default <aclass="reference external"href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <aclass="reference external"href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">cache.close();</span></code> To avoid memory leaks, you must close any objects you constructed that extend org.rocksdb.RocksObject. See <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management">RocksJava docs</a> for more details.</li>
<h4><aclass="toc-backref"href="#id39">task.assignor.class</a><aclass="headerlink"href="#task-assignor-class"title="Permalink to this headline"></a></h4>
<blockquote>
<div>A task assignor class or class name implementing the
<code>org.apache.kafka.streams.processor.assignment.TaskAssignor</code> interface. Defaults to the
high-availability task assignor. One possible alternative implementation provided in Apache Kafka is the
<code>org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor</code>, which was the default task
assignor before KIP-441 and minimizes task movement at the cost of stateful task availability. Alternative implementations of
the task assignment algorithm can be plugged into the application by implementing a custom <code>TaskAssignor</code> and setting this config to the name of the custom task assignor class.
<h4><aclass="toc-backref"href="#id31">topology.optimization</a><aclass="headerlink"href="#topology-optimization"title="Permalink to this headline"></a></h4>
A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: <code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>), <code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> (<code>single.store.self.join</code>).
We recommend listing specific optimizations in the config for production code so that the structure of your topology will not change unexpectedly during upgrades of the Streams library.
</p>
<p>
These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. These optimizations will save on network traffic and storage in Kafka without changing the semantics of your applications. Enabling them is recommended.
Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
configuration properties when building your topology by using the overloaded <code>StreamsBuilder.build(Properties)</code> method.
For example <code>KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)</code>.
<h4><aclass="toc-backref"href="#id31">windowed.inner.class.serde</a><aclass="headerlink"href="#windowed.inner.class.serde"title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface.
</p>
<p>
Note that this config is only used by plain consumer/producer clients that set a windowed de/serializer type via configs. For Kafka Streams applications that deal with windowed types, you must pass in the inner serde type when you instantiate the windowed serde object for your topology.
<spanid="streams-developer-guide-upgrade-from"></span><h4><aclass="toc-backref"href="#id14">upgrade.from</a><aclass="headerlink"href="#upgrade-from"title="Permalink to this headline"></a></h4>
The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide.
You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the
newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path
when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4.
<h3><aclass="toc-backref"href="#id16">Kafka consumers, producer and admin client configuration parameters</a><aclass="headerlink"href="#kafka-consumers-and-producer-configuration-parameters"title="Permalink to this headline"></a></h3>
<p>You can specify parameters for the Kafka <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/package-summary.html">consumers</a>, <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/producer/package-summary.html">producers</a>,
and <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html">admin client</a> that are used internally.
The consumer, producer and admin client settings are defined by specifying parameters in a <codeclass="docutils literal"><spanclass="pre">StreamsConfig</span></code> instance.</p>
<p>In this example, the Kafka <aclass="reference external"href="/{{version}}/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer session timeout</a> is configured to be 60000 milliseconds in the Streams settings:</p>
<h4><aclass="toc-backref"href="#id17">Naming</a><aclass="headerlink"href="#naming"title="Permalink to this headline"></a></h4>
<p>Some consumer, producer and admin client configuration parameters use the same parameter name, and Kafka Streams library itself also uses some parameters that share the same name with its embedded client. For example, <codeclass="docutils literal"><spanclass="pre">send.buffer.bytes</span></code> and
<codeclass="docutils literal"><spanclass="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <codeclass="docutils literal"><spanclass="pre">request.timeout.ms</span></code> and <codeclass="docutils literal"><spanclass="pre">retry.backoff.ms</span></code> control retries for client request.
You can avoid duplicate names by prefix parameter names with <codeclass="docutils literal"><spanclass="pre">consumer.</span></code>, <codeclass="docutils literal"><spanclass="pre">producer.</span></code>, or <codeclass="docutils literal"><spanclass="pre">admin.</span></code> (e.g., <codeclass="docutils literal"><spanclass="pre">consumer.send.buffer.bytes</span></code> and <codeclass="docutils literal"><spanclass="pre">producer.send.buffer.bytes</span></code>).</p>
<p>You could further separate consumer configuration by adding different prefixes:</p>
<ulclass="simple">
<li><codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code> for main consumer which is the default consumer of stream source.</li>
<li><codeclass="docutils literal"><spanclass="pre">restore.consumer.</span></code> for restore consumer which is in charge of state store recovery.</li>
<li><codeclass="docutils literal"><spanclass="pre">global.consumer.</span></code> for global consumer which is used in global KTable construction.</li>
</ul>
<p>For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use <codeclass="docutils literal"><spanclass="pre">restore.consumer.</span></code> to set the config.</p>
<p> Same applied to <codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code> and <codeclass="docutils literal"><spanclass="pre">main.consumer.</span></code>, if you only want to specify one consumer type config.</p>
<p> Additionally, to configure the internal repartition/changelog topics, you could use the <codeclass="docutils literal"><spanclass="pre">topic.</span></code> prefix, followed by any of the standard topic configs.</p>
<h3><aclass="toc-backref"href="#id26">Parameters controlled by Kafka Streams</a><aclass="headerlink"href="#parameters-controlled-by-kafka-streams"title="Permalink to this headline"></a></h3>
<strong>For Non-EOS and EOS v2: </strong><codeclass="docutils literal"><spanclass="pre"><client.id>-StreamThread-<threadIdx>-producer</span></code>
</td>
</tr>
<tr>
<td>
<strong>For EOS v1: </strong><codeclass="docutils literal"><spanclass="pre"><client.id>-StreamThread-<threadIdx>-<taskId>-producer</span></code>
<spanid="streams-developer-guide-consumer-auto-commit"></span><h4><aclass="toc-backref"href="#id19">enable.auto.commit</a><aclass="headerlink"href="#enable-auto-commit"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config
value to <codeclass="docutils literal"><spanclass="pre">false</span></code>. Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides
to commit the current processing state.</div></blockquote>