<spanid="streams-developer-guide-configuration"></span><h1>Configuring a Streams Application<aclass="headerlink"href="#configuring-a-streams-application"title="Permalink to this headline"></a></h1>
<p>Kafka and Kafka Streams configuration options must be configured before using Streams. You can configure Kafka Streams by specifying parameters in a <codeclass="docutils literal"><spanclass="pre">StreamsConfig</span></code> instance.</p>
<olclass="arabic">
<li><pclass="first">Create a <codeclass="docutils literal"><spanclass="pre">java.util.Properties</span></code> instance.</p>
</li>
<li><pclass="first">Set the <aclass="reference internal"href="#streams-developer-guide-required-configs"><spanclass="std std-ref">parameters</span></a>.</p>
</li>
<li><pclass="first">Construct a <codeclass="docutils literal"><spanclass="pre">StreamsConfig</span></code> instance from the <codeclass="docutils literal"><spanclass="pre">Properties</span></code> instance. For example:</p>
<spanid="streams-developer-guide-required-configs"></span><h2>Configuration parameter reference<aclass="headerlink"href="#configuration-parameter-reference"title="Permalink to this headline"></a></h2>
<p>This section contains the most common Streams configuration parameters. For a full reference, see the <aclass="reference external"href="../../../javadoc/org/apache/kafka/streams/StreamsConfig.html">Streams</a> Javadocs.</p>
<h3><aclass="toc-backref"href="#id3">Required configuration parameters</a><aclass="headerlink"href="#required-configuration-parameters"title="Permalink to this headline"></a></h3>
<p>Here are the required Streams configuration parameters.</p>
<tdcolspan="2">An identifier for the stream processing application. Must be unique within the Kafka cluster.</td>
<td>None</td>
</tr>
<trclass="row-odd"><td>bootstrap.servers</td>
<td>Required</td>
<tdcolspan="2">A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.</td>
<td>None</td>
</tr>
</tbody>
</table>
<divclass="section"id="application-id">
<h4><aclass="toc-backref"href="#id4">application.id</a><aclass="headerlink"href="#application-id"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>(Required) The application ID. Each stream processing application must have a unique ID. The same ID must be given to
all instances of the application. It is recommended to use only alphanumeric characters, <codeclass="docutils literal"><spanclass="pre">.</span></code> (dot), <codeclass="docutils literal"><spanclass="pre">-</span></code> (hyphen), and <codeclass="docutils literal"><spanclass="pre">_</span></code> (underscore). Examples: <codeclass="docutils literal"><spanclass="pre">"hello_world"</span></code>, <codeclass="docutils literal"><spanclass="pre">"hello_world-v1.0.0"</span></code></p>
<p>This ID is used in the following places to isolate resources used by the application from others:</p>
<ulclass="simple">
<li>As the default Kafka consumer and producer <codeclass="docutils literal"><spanclass="pre">client.id</span></code> prefix</li>
<li>As the Kafka consumer <codeclass="docutils literal"><spanclass="pre">group.id</span></code> for coordination</li>
<li>As the name of the subdirectory in the state directory (cf. <codeclass="docutils literal"><spanclass="pre">state.dir</span></code>)</li>
<li>As the prefix of internal Kafka topic names</li>
</ul>
<dlclass="docutils">
<dt>Tip:</dt>
<dd>When an application is updated, the <codeclass="docutils literal"><spanclass="pre">application.id</span></code> should be changed unless you want to reuse the existing data in internal topics and state stores.
For example, you could embed the version information within <codeclass="docutils literal"><spanclass="pre">application.id</span></code>, as <codeclass="docutils literal"><spanclass="pre">my-app-v1.0.0</span></code> and <codeclass="docutils literal"><spanclass="pre">my-app-v1.0.2</span></code>.</dd>
</dl>
</div></blockquote>
</div>
<divclass="section"id="bootstrap-servers">
<h4><aclass="toc-backref"href="#id5">bootstrap.servers</a><aclass="headerlink"href="#bootstrap-servers"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>(Required) The Kafka bootstrap servers. This is the same <aclass="reference external"href="http://kafka.apache.org/documentation.html#producerconfigs">setting</a> that is used by the underlying producer and consumer clients to connect to the Kafka cluster.
<spanid="streams-developer-guide-optional-configs"></span><h3><aclass="toc-backref"href="#id6">Optional configuration parameters</a><aclass="headerlink"href="#optional-configuration-parameters"title="Permalink to this headline"></a></h3>
<p>Here are the optional <ahref="../../../javadoc/org/apache/kafka/streams/StreamsConfig.html">Streams</a> javadocs, sorted by level of importance:</p>
<li>High: These parameters can have a significant impact on performance. Take care when deciding the values of these parameters.</li>
<li>Medium: These parameters can have some impact on performance. Your specific environment will determine how much tuning effort should be focused on these parameters.</li>
<li>Low: These parameters have a less general or less significant impact on performance.</li>
<tdcolspan="2">Exception handling class that implements the <codeclass="docutils literal"><spanclass="pre">DeserializationExceptionHandler</span></code> interface.</td>
<tdcolspan="2">Exception handling class that implements the <codeclass="docutils literal"><spanclass="pre">ProductionExceptionHandler</span></code> interface.</td>
<tdcolspan="2">Default serializer/deserializer class for record keys, implements the <codeclass="docutils literal"><spanclass="pre">Serde</span></code> interface (see also value.serde).</td>
<tdcolspan="2">Partition grouper class that implements the <codeclass="docutils literal"><spanclass="pre">PartitionGrouper</span></code> interface.</td>
<tdcolspan="2">Timestamp extractor class that implements the <codeclass="docutils literal"><spanclass="pre">TimestampExtractor</span></code> interface.</td>
<tdcolspan="2">Default serializer/deserializer class for record values, implements the <codeclass="docutils literal"><spanclass="pre">Serde</span></code> interface (see also key.serde).</td>
<spanid="streams-developer-guide-deh"></span><h4><aclass="toc-backref"href="#id7">default.deserialization.exception.handler</a><aclass="headerlink"href="#default-deserialization-exception-handler"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The default deserialization exception handler allows you to manage record exceptions that fail to deserialize. This
can be caused by corrupt data, incorrect serialization logic, or unhandled record types. These exception handlers
<spanid="streams-developer-guide-peh"></span><h4><aclass="toc-backref"href="#id24">default.production.exception.handler</a><aclass="headerlink"href="#default-production-exception-handler"title="Permalink to this headline"></a></h4>
such as attempting to produce a record that is too large. By default, Kafka provides and uses the <aclass="reference external"href="../../../javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a>
that always fails when these exceptions occur.</p>
<p>Each exception handler can return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams
should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p>
<h4><aclass="toc-backref"href="#id8">default.key.serde</a><aclass="headerlink"href="#default-key-serde"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The default Serializer/Deserializer class for record keys. Serialization and deserialization in Kafka Streams happens
whenever data needs to be materialized, for example:</p>
<blockquote>
<div><ulclass="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code> and <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
</ul>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
</div></blockquote>
</div></blockquote>
</div>
<divclass="section"id="default-value-serde">
<h4><aclass="toc-backref"href="#id9">default.value.serde</a><aclass="headerlink"href="#default-value-serde"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The default Serializer/Deserializer class for record values. Serialization and deserialization in Kafka Streams
happens whenever data needs to be materialized, for example:</p>
<ulclass="simple">
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <codeclass="docutils literal"><spanclass="pre">StreamsBuilder#stream()</span></code> and <codeclass="docutils literal"><spanclass="pre">KStream#to()</span></code> methods).</li>
<li>Whenever data is read from or written to a <em>state store</em>.</li>
</ul>
<p>This is discussed in more detail in <aclass="reference internal"href="datatypes.html#streams-developer-guide-serdes"><spanclass="std std-ref">Data types and serialization</span></a>.</p>
</div></blockquote>
</div>
<divclass="section"id="num-standby-replicas">
<spanid="streams-developer-guide-standby-replicas"></span><h4><aclass="toc-backref"href="#id10">num.standby.replicas</a><aclass="headerlink"href="#num-standby-replicas"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The number of standby replicas. Standby replicas are shadow copies of local state stores. Kafka Streams attempts to create the
specified number of replicas and keep them up to date as long as there are enough instances running.
Standby replicas are used to minimize the latency of task failover. A task that was previously running on a failed instance is
preferred to restart on an instance that has standby replicas so that the local state store restoration process from its
changelog can be minimized. Details about how Kafka Streams makes use of the standby replicas to minimize the cost of
resuming tasks on failover can be found in the <aclass="reference internal"href="../architecture.html#streams-architecture-state"><spanclass="std std-ref">State</span></a> section.</div></blockquote>
</div>
<divclass="section"id="num-stream-threads">
<h4><aclass="toc-backref"href="#id11">num.stream.threads</a><aclass="headerlink"href="#num-stream-threads"title="Permalink to this headline"></a></h4>
<blockquote>
<div>This specifies the number of stream threads in an instance of the Kafka Streams application. The stream processing code runs in these thread.
For more information about Kafka Streams threading model, see <aclass="reference internal"href="../architecture.html#streams-architecture-threads"><spanclass="std std-ref">Threading Model</span></a>.</div></blockquote>
</div>
<divclass="section"id="partition-grouper">
<spanid="streams-developer-guide-partition-grouper"></span><h4><aclass="toc-backref"href="#id12">partition.grouper</a><aclass="headerlink"href="#partition-grouper"title="Permalink to this headline"></a></h4>
<blockquote>
<div>A partition grouper creates a list of stream tasks from the partitions of source topics, where each created task is assigned with a group of source topic partitions.
The default implementation provided by Kafka Streams is <aclass="reference external"href="../../../javadoc/org/apache/kafka/streams/processor/DefaultPartitionGrouper.html">DefaultPartitionGrouper</a>.
It assigns each task with one partition for each of the source topic partitions. The generated number of tasks equals the largest
number of partitions among the input topics. Usually an application does not need to customize the partition grouper.</div></blockquote>
</div>
<divclass="section"id="replication-factor">
<spanid="replication-factor-parm"></span><h4><aclass="toc-backref"href="#id13">replication.factor</a><aclass="headerlink"href="#replication-factor"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>This specifies the replication factor of internal topics that Kafka Streams creates when local states are used or a stream is
repartitioned for aggregation. Replication is important for fault tolerance. Without replication even a single broker failure
may prevent progress of the stream processing application. It is recommended to use a similar replication factor as source topics.</p>
<dlclass="docutils">
<dt>Recommendation:</dt>
<dd>Increase the replication factor to 3 to ensure that the internal Kafka Streams topic can tolerate up to 2 broker failures.
Note that you will require more storage space as well (3 times more with the replication factor of 3).</dd>
</dl>
</div></blockquote>
</div>
<divclass="section"id="state-dir">
<h4><aclass="toc-backref"href="#id14">state.dir</a><aclass="headerlink"href="#state-dir"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The state directory. Kafka Streams persists local states under the state directory. Each application has a subdirectory on its hosting
machine that is located under the state directory. The name of the subdirectory is the application ID. The state stores associated
with the application are created under this subdirectory.</div></blockquote>
</div>
<divclass="section"id="timestamp-extractor">
<spanid="streams-developer-guide-timestamp-extractor"></span><h4><aclass="toc-backref"href="#id15">timestamp.extractor</a><aclass="headerlink"href="#timestamp-extractor"title="Permalink to this headline"></a></h4>
<div><p>A timestamp extractor pulls a timestamp from an instance of <aclass="reference external"href="../../../javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html">ConsumerRecord</a>.
This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer
client since
<aclass="reference external"href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka version 0.10</a>.
Depending on the setting of Kafka’s server-side <codeclass="docutils literal"><spanclass="pre">log.message.timestamp.type</span></code> broker and <codeclass="docutils literal"><spanclass="pre">message.timestamp.type</span></code> topic parameters,
this extractor provides you with:</p>
<ulclass="simple">
<li><strong>event-time</strong> processing semantics if <codeclass="docutils literal"><spanclass="pre">log.message.timestamp.type</span></code> is set to <codeclass="docutils literal"><spanclass="pre">CreateTime</span></code> aka “producer time”
(which is the default). This represents the time when a Kafka producer sent the original message. If you use Kafka’s
official producer client, the timestamp represents milliseconds since the epoch.</li>
<li><strong>ingestion-time</strong> processing semantics if <codeclass="docutils literal"><spanclass="pre">log.message.timestamp.type</span></code> is set to <codeclass="docutils literal"><spanclass="pre">LogAppendTime</span></code> aka “broker
time”. This represents the time when the Kafka broker received the original message, in milliseconds since the epoch.</li>
</ul>
<p>The <codeclass="docutils literal"><spanclass="pre">FailOnInvalidTimestamp</span></code> extractor throws an exception if a record contains an invalid (i.e. negative) built-in
timestamp, because Kafka Streams would not process this record but silently drop it. Invalid built-in timestamps can
occur for various reasons: if for example, you consume a topic that is written to by pre-0.10 Kafka producer clients
or by third-party producer clients that don’t support the new Kafka 0.10 message format yet; another situation where
this may happen is after upgrading your Kafka cluster from <codeclass="docutils literal"><spanclass="pre">0.9</span></code> to <codeclass="docutils literal"><spanclass="pre">0.10</span></code>, where all the data that was generated
with <codeclass="docutils literal"><spanclass="pre">0.9</span></code> does not include the <codeclass="docutils literal"><spanclass="pre">0.10</span></code> message timestamps.</p>
<p>If you have data with invalid timestamps and want to process it, then there are two alternative extractors available.
Both work on built-in timestamps, but handle invalid timestamps differently.</p>
This extractor does not actually “extract” a timestamp from the consumed record but rather returns the current time in
milliseconds from the system clock (think: <codeclass="docutils literal"><spanclass="pre">System.currentTimeMillis()</span></code>), which effectively means Streams will operate
on the basis of the so-called <strong>processing-time</strong> of events.</p>
<p>You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of
messages. If you cannot extract a valid timestamp, you can either throw an exception, return a negative timestamp, or
estimate a timestamp. Returning a negative timestamp will result in data loss – the corresponding record will not be
processed but silently dropped. If you want to estimate a new timestamp, you can use the value provided via
<codeclass="docutils literal"><spanclass="pre">previousTimestamp</span></code> (i.e., a Kafka Streams timestamp estimation). Here is an example of a custom
<h3><aclass="toc-backref"href="#id16">Kafka consumers and producer configuration parameters</a><aclass="headerlink"href="#kafka-consumers-and-producer-configuration-parameters"title="Permalink to this headline"></a></h3>
<p>You can specify parameters for the Kafka <aclass="reference external"href="../../../javadoc/org/apache/kafka/clients/consumer/package-summary.html">consumers</a> and <aclass="reference external"href="../../../javadoc/org/apache/kafka/clients/producer/package-summary.html">producers</a> that are used internally. The consumer and producer settings
<p>In this example, the Kafka <aclass="reference external"href="../../../javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG">consumer session timeout</a> is configured to be 60000 milliseconds in the Streams settings:</p>
<h4><aclass="toc-backref"href="#id17">Naming</a><aclass="headerlink"href="#naming"title="Permalink to this headline"></a></h4>
<p>Some consumer and producer configuration parameters use the same parameter name. For example, <codeclass="docutils literal"><spanclass="pre">send.buffer.bytes</span></code> and
<codeclass="docutils literal"><spanclass="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <codeclass="docutils literal"><spanclass="pre">request.timeout.ms</span></code> and <codeclass="docutils literal"><spanclass="pre">retry.backoff.ms</span></code> control retries
for client request. You can avoid duplicate names by prefix parameter names with <codeclass="docutils literal"><spanclass="pre">consumer.</span></code> or <codeclass="docutils literal"><spanclass="pre">producer</span></code> (e.g., <codeclass="docutils literal"><spanclass="pre">consumer.send.buffer.bytes</span></code> and <codeclass="docutils literal"><spanclass="pre">producer.send.buffer.bytes</span></code>).</p>
<spanid="streams-developer-guide-consumer-auto-commit"></span><h4><aclass="toc-backref"href="#id19">enable.auto.commit</a><aclass="headerlink"href="#enable-auto-commit"title="Permalink to this headline"></a></h4>
<blockquote>
<div>The consumer auto commit. To guarantee at-least-once processing semantics and turn off auto commits, Kafka Streams overrides this consumer config
value to <codeclass="docutils literal"><spanclass="pre">false</span></code>. Consumers will only commit explicitly via <em>commitSync</em> calls when the Kafka Streams library or a user decides
to commit the current processing state.</div></blockquote>
</div>
<divclass="section"id="rocksdb-config-setter">
<spanid="streams-developer-guide-rocksdb-config"></span><h4><aclass="toc-backref"href="#id20">rocksdb.config.setter</a><aclass="headerlink"href="#rocksdb-config-setter"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default
configuration for RocksDB, implement <codeclass="docutils literal"><spanclass="pre">RocksDBConfigSetter</span></code> and provide your custom class via <aclass="reference external"href="../../../javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html">rocksdb.config.setter</a>.</p>
<li><codeclass="docutils literal"><spanclass="pre">BlockBasedTableConfig</span><spanclass="pre">tableConfig</span><spanclass="pre">=</span><spanclass="pre">new</span><spanclass="pre">org.rocksdb.BlockBasedTableConfig();</span></code> Reduce block cache size from the default, shown <aclass="reference external"href="https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L81">here</a>, as the total number of store RocksDB databases is partitions (40) * segments (3) = 120.</li>
<li><codeclass="docutils literal"><spanclass="pre">tableConfig.setBlockSize(16</span><spanclass="pre">*</span><spanclass="pre">1024L);</span></code> Modify the default <aclass="reference external"href="https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L82">block size</a> per these instructions from the <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">tableConfig.setCacheIndexAndFilterBlocks(true);</span></code> Do not let the index and filter blocks grow unbounded. For more information, see the <aclass="reference external"href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB GitHub</a>.</li>
<li><codeclass="docutils literal"><spanclass="pre">options.setMaxWriteBufferNumber(2);</span></code> See the advanced options in the <aclass="reference external"href="https://github.com/facebook/rocksdb/blob/8dee8cad9ee6b70fd6e1a5989a8156650a70c04f/include/rocksdb/advanced_options.h#L103">RocksDB GitHub</a>.</li>
<h3><aclass="toc-backref"href="#id21">Recommended configuration parameters for resiliency</a><aclass="headerlink"href="#recommended-configuration-parameters-for-resiliency"title="Permalink to this headline"></a></h3>
<p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>
<p>Increasing the replication factor to 3 ensures that the internal Kafka Streams topic can tolerate up to 2 broker failures. Changing the acks setting to “all”
guarantees that a record will not be lost as long as one replica is alive. The tradeoff from moving to the default values to the recommended ones is
that some performance and more storage space (3x with the replication factor of 3) are sacrificed for more resiliency.</p>
<divclass="section"id="acks">
<h4><aclass="toc-backref"href="#id22">acks</a><aclass="headerlink"href="#acks"title="Permalink to this headline"></a></h4>
<blockquote>
<div><p>The number of acknowledgments that the leader must have received before considering a request complete. This controls
the durability of records that are sent. The possible values are:</p>
<ulclass="simple">
<li><codeclass="docutils literal"><spanclass="pre">acks=0</span></code> The producer does not wait for acknowledgment from the server and the record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the <codeclass="docutils literal"><spanclass="pre">retries</span></code> configuration will not take effect (as the client won’t generally know of any failures). The offset returned for each record will always be set to <codeclass="docutils literal"><spanclass="pre">-1</span></code>.</li>
<li><codeclass="docutils literal"><spanclass="pre">acks=1</span></code> The leader writes the record to its local log and responds without waiting for full acknowledgement from all followers. If the leader immediately fails after acknowledging the record, but before the followers have replicated it, then the record will be lost.</li>
<li><codeclass="docutils literal"><spanclass="pre">acks=all</span></code> The leader waits for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost if there is at least one in-sync replica alive. This is the strongest available guarantee.</li>
</ul>
<p>For more information, see the <aclass="reference external"href="https://kafka.apache.org/documentation/#producerconfigs">Kafka Producer documentation</a>.</p>
</div></blockquote>
</div>
<divclass="section"id="id2">
<h4><aclass="toc-backref"href="#id23">replication.factor</a><aclass="headerlink"href="#id2"title="Permalink to this headline"></a></h4>
<blockquote>
<div>See the <aclass="reference internal"href="#replication-factor-parm"><spanclass="std std-ref">description here</span></a>.</div></blockquote>
<p>You define these settings via <codeclass="docutils literal"><spanclass="pre">StreamsConfig</span></code>:</p>