Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 3.4 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
(possible values are <code>"0.10.0" - "3.4"</code>) and during the second you remove it. This is required to safely handle 3 changes. The first is introduction of the new cooperative rebalancing protocol of the embedded consumer. The second is a change in foreign-key join serialization format.
Note that you will remain using the old eager rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to
<ahref="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>. The third is a change in the serialization format for an internal repartition topic. For more details, please refer to <ahref="https://cwiki.apache.org/confluence/x/P5VbDg">KIP-904</a>:
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to the version from which it is being upgrade.</li>
<li> prepare your newly deployed {{fullDotVersion}} application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.from</code></li>
<p> As an alternative, an offline upgrade is also possible. Upgrading from any versions as old as 0.10.0.x to {{fullDotVersion}} in offline mode require the following steps: </p>
Note: The cooperative rebalancing protocol has been the default since 2.4, but we have continued to support the
eager rebalancing protocol to provide users an upgrade path. This support will be dropped in a future release,
so any users still on the eager protocol should prepare to finish upgrading their applications to the cooperative protocol in version 3.1.
This only affects users who are still on a version older than 2.4, and users who have upgraded already but have not yet
removed the <code>upgrade.from</code> config that they set when upgrading from a version below 2.4.
Users fitting into the latter case will simply need to unset this config when upgrading beyond 3.1,
while users in the former case will need to follow a slightly different upgrade path if they attempt to upgrade from 2.3 or below to a version above 3.1.
Those applications will need to go through a bridge release, by first upgrading to a version between 2.4 - 3.1 and setting the <code>upgrade.from</code> config,
then removing that config and upgrading to the final version above 3.1. See <ahref="https://issues.apache.org/jira/browse/KAFKA-8575">KAFKA-8575</a>
<h3class="anchor-heading"><aid="streams_notable_changes"class="anchor-link"></a><ahref="#streams_notable_changes">Notable compatibility changes in past releases</a></h3>
Kafka Streams does not support running multiple instances of the same application as different processes on the same physical state directory. Starting in 2.8.0 (as well as 2.7.1 and 2.6.2),
this restriction will be enforced. If you wish to run more than one instance of Kafka Streams, you must configure them with different values for <code>state.dir</code>.
In deprecated <code>KStreamBuilder</code> class, when a <code>KTable</code> is created from a source topic via <code>KStreamBuilder.table()</code>, its materialized state store
will reuse the source topic as its changelog topic for restoring, and will disable logging to avoid appending new updates to the source topic; in the <code>StreamsBuilder</code> class introduced in 1.0, this behavior was changed
accidentally: we still reuse the source topic as the changelog topic for restoring, but will also create a separate changelog topic to append the update records from source topic to. In the 2.0 release, we have fixed this issue and now users
can choose whether or not to reuse the source topic based on the <code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code>: if you are upgrading from the old <code>KStreamBuilder</code> class and hence you need to change your code to use
the new <code>StreamsBuilder</code>, you should set this config value to <code>StreamsConfig#OPTIMIZE</code> to continue reusing the source topic; if you are upgrading from 1.0 or 1.1 where you are already using <code>StreamsBuilder</code> and hence have already
created a separate changelog topic, you should set this config value to <code>StreamsConfig#NO_OPTIMIZATION</code> when upgrading to {{fullDotVersion}} in order to use that changelog topic for restoring the state store.
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code> can be found in <ahref="https://cwiki.apache.org/confluence/x/V53LB">KIP-295</a>.
enables you to enforce explicit naming for all internal resources of the topology, including internal topics (e.g., changelog and repartition topics) and their associated state stores.
This ensures that every internal resource is named before the Kafka Streams application is deployed, which is essential for upgrading your topology.
You can enable this feature via <code>StreamsConfig</code> using the <code>StreamsConfig#ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG</code> parameter.
When set to <code>true</code>, the application will refuse to start if any internal resource has an auto-generated name.
In this release two configs <code>default.deserialization.exception.handler</code> and <code>default.production.exception.handler</code> are deprecated, as they don't have any overwrites, which is described in
Previously, the <code>ProductionExceptionHandler</code> was not invoked on a (retriable) <code>TimeoutException</code>. With Kafka Streams 4.0, the handler is called, and the default handler would return <code>RETRY</code> to not change existing behavior.
However, a custom handler can now decide to break the infinite retry loop by returning either <code>CONTINUE</code> or <code>FAIL</code> (<ahref="https://cwiki.apache.org/confluence/x/LQ6TEg">KIP-1065</a>).
deprecates the <code>ForeachProcessor</code> class.
This change is aimed at improving the organization and clarity of the Kafka Streams API by ensuring that internal classes are not exposed in public packages.
deprecate certain public doc description variables that are only used within the <code>StreamsConfig</code> or <code>TopologyConfig</code> classes.
Additionally, the unused variable <code>DUMMY_THREAD_INDEX</code> will also be deprecated.
</p>
<p>
Due to the removal of the already deprecated <code>#through</code> method in Kafka Streams, the <code>intermediateTopicsOption</code> of <code>StreamsResetter</code> tool in Apache Kafka is
introduces numeric counterparts to allow proper broker-side metric collection for Kafka Streams applications.
These metrics will be available at the <code>INFO</code> recording level, and a thread-level metric with a String value will be available for users leveraging Java Management Extensions (<code>JMX</code>).
</p>
<p>
In order to reduce storage overhead and improve API usability, a new method in the Java and Scala APIs that accepts a BiFunction for foreign key extraction is introduced by
KIP-1104 allows foreign key extraction from both the key and value in KTable joins in Apache Kafka.
Previously, foreign key joins in KTables only allowed extraction from the value, which led to data duplication and potential inconsistencies.
This enhancement introduces a new method in the Java and Scala APIs that accepts a BiFunction for foreign key extraction, enabling more intuitive and efficient joins.
The existing methods will be deprecated but not removed, ensuring backward compatibility. This change aims to reduce storage overhead and improve API usability.
the existing <code>Topology.AutoOffsetReset</code> is deprecated and replaced with a new class <code>org.apache.kafka.streams.AutoOffsetReset</code> to capture the reset strategies.
New methods will be added to the <code>org.apache.kafka.streams.Topology</code> and <code>org.apache.kafka.streams.kstream.Consumed</code> classes to support the new reset strategy.
These changes aim to provide more flexibility and efficiency in managing offsets, especially in scenarios involving long-term storage and infinite retention.
Upgraded RocksDB dependency to version 9.7.3 (from 7.9.2). This upgrade incorporates various improvements and optimizations within RocksDB. However, it also introduces some API changes.
The <code>org.rocksdb.AccessHint</code> class, along with its associated methods, has been removed.
Several methods related to compressed block cache configuration in the <code>BlockBasedTableConfig</code> class have been removed, including <code>blockCacheCompressedNumShardBits</code>, <code>blockCacheCompressedSize</code>, and their corresponding setters. These functionalities are now consolidated under the <code>cache</code> option, and developers should configure their compressed block cache using the <code>setCache</code> method instead.
The <code>NO_FILE_CLOSES</code> field has been removed from the <code>org.rocksdb.TickerTypeenum</code> as a result the <code>number-open-files</code> metrics does not work as expected. Metric <code>number-open-files</code> returns constant -1 from now on until it will officially be removed.
The <code>org.rocksdb.Options.setLogger()</code> method now accepts a <code>LoggerInterface</code> as a parameter instead of the previous <code>Logger</code>.
Some data types used in RocksDB's Java API have been modified. These changes, along with the removed class, field, and new methods, are primarily relevant to users implementing custom RocksDB configurations.
These changes are expected to be largely transparent to most Kafka Streams users. However, those employing advanced RocksDB customizations within their Streams applications, particularly through the <code>rocksdb.config.setter</code>, are advised to consult the detailed RocksDB 9.7.3 changelog to ensure a smooth transition and adapt their configurations as needed. Specifically, users leveraging the removed <code>AccessHint</code> class, the removed methods from the <code>BlockBasedTableConfig</code> class, the <code>NO_FILE_CLOSES</code> field from <code>TickerType</code>, or relying on the previous signature of <code>setLogger()</code> will need to update their implementations.
enables you to provide a processing exception handler to manage exceptions during the processing of a record rather than throwing the exception all the way out of your streams application.
You can provide the configs via the <code>StreamsConfig</code> as <code>StreamsConfig#PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG</code>.
The specified handler must implement the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface.
Kafka Streams now allows to customize the logging interval of stream-thread runtime summary, via the newly added config <code>log.summary.interval.ms</code>.
By default, the summary is logged every 2 minutes. More details can be found in
in which users can provide their customized implementation of the newly added <code>StandbyUpdateListener</code> interface to continuously monitor changes to standby tasks.
IQv2 supports <code>RangeQuery</code> that allows to specify unbounded, bounded, or half-open key-ranges, which return data in unordered (byte[]-lexicographical) order (per partition).
<ahref="https://cwiki.apache.org/confluence/x/eKCzDw">KIP-985</a> extends this functionality by adding <code>.withDescendingKeys()</code> and <code>.withAscendingKeys()</code>to allow user to receive data in descending or ascending order.
namely <code>TimestampedKeyQuery</code> and <code>TimestampedRangeQuery</code>. Both should be used to query a timestamped key-value store, to retrieve a <code>ValueAndTimestamp</code> result.
The existing <code>KeyQuery</code> and <code>RangeQuery</code> are changed to always return the value only for timestamped key-value stores.
The non-null key requirements for Kafka Streams join operators were relaxed as part of <ahref="https://cwiki.apache.org/confluence/x/f5CzDw">KIP-962</a>.
<li>left join KStream-KStream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.</li>
<li>outer join KStream-KStream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.</li>
<li>left-foreign-key join KTable-KTable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.</li>
<li>left join KStream-KTable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.</li>
<li>left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value.</li>
</ul>
Stream-DSL users who want to keep the current behavior can prepend a .filter() operator to the aforementioned operators and filter accordingly.
The following snippets illustrate how to keep the old behavior.
A new configuration option <code>balance_subtopology</code> for <code>rack.aware.assignment.strategy</code> was introduced in 3.7 release.
For more information, including how it can be enabled and further configured, see the <ahref="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka Streams Developer Guide</b></a>.
Rack aware task assignment can be enabled for <code>StickyTaskAssignor</code> or <code>HighAvailabilityTaskAssignor</code> to compute task assignments which can minimize cross rack traffic under certain conditions.
For more information, including how it can be enabled and further configured, see the <ahref="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka Streams Developer Guide</b></a>.
</p>
<p>
IQv2 supports a <code>RangeQuery</code> that allows to specify unbounded, bounded, or half-open key-ranges. Users have to use <code>withUpperBound(K)</code>, <code>withLowerBound(K)</code>,
or <code>withNoBounds()</code> to specify half-open or unbounded ranges, but cannot use <code>withRange(K lower, K upper)</code> for the same.
For more information, including how it can be enabled and further configured, see the <ahref="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka Streams Developer Guide</b></a>.
improves the implementation of KTable aggregations. In general, an input KTable update triggers a result refinent for two rows;
however, prior to KIP-904, if both refinements happen to the same result row, two independent updates to the same row are applied, resulting in spurious itermediate results.
config <code>cache.max.bytes.buffering</code> in favor of the newly introduced config <code>statestore.cache.max.bytes</code>.
To improve monitoring, two new metrics <code>input-buffer-bytes-total</code> and <code>cache-size-bytes-total</code>
were added at the DEBUG level. Note, that the KIP is only partially implemented in the 3.4.0 release, and config
<code>input.buffer.max.bytes</code> is not available yet.
</p>
<p>
<ahref="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356">KIP-873</a> enables you to multicast
result records to multiple partition of downstream sink topics and adds functionality for choosing to drop result records without sending.
The <code>Integer StreamPartitioner.partition()</code> method is deprecated and replaced by the newly added
<code>Optiona≶Set<Integer>>StreamPartitioner.partitions()</code> method, which enables returning a set of partitions to send the record to.
However, the KIP was only partially implemented, and is now completed with the 3.2.0 release.
For a full list of available RocksDB metrics, please consult the <ahref="/{{version}}/documentation/#kafka_streams_client_monitoring">monitoring documentation</a>.
</p>
<p>
Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use.
However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB
store to in-memory store for all operators, especially for larger topologies.
Interactive Queries may throw new exceptions for different errors:
</p>
<ul>
<li><code>UnknownStateStoreException</code>: If the specified store name does not exist in the topology, an <code>UnknownStateStoreException</code> will be thrown instead of the former <code>InvalidStateStoreException</code>.</li>
<li><code>StreamsNotStartedException</code>: If Streams state is <code>CREATED</code>, a <code>StreamsNotStartedException</code> will be thrown.</li>
<li><code>InvalidStateStorePartitionException</code>: If the specified partition does not exist, a <code>InvalidStateStorePartitionException</code> will be thrown.</li>
We deprecated the StreamsConfig <code>processing.guarantee</code> configuration value <code>"exactly_once"</code> (for EOS version 1) in favor of the improved EOS version 2, formerly configured via
<code>"exactly_once_beta</code>. To avoid confusion about the term "beta" in the config name and highlight the production-readiness of EOS version 2, we have also renamed "eos-beta" to "eos-v2"
and deprecated the configuration value <code>"exactly_once_beta"</code>, replacing it with a new configuration value <code>"exactly_once_v2"</code>
Users of exactly-once semantics should plan to migrate to the eos-v2 config and prepare for the removal of the deprecated configs in 4.0 or after at least a year
from the release of 3.0, whichever comes last. Note that eos-v2 requires broker version 2.5 or higher, like eos-beta, so users should begin to upgrade their kafka cluster if necessary. See
<ahref="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
Additionally, in older versions Kafka Streams emitted stream-stream left/outer join results eagerly. This behavior may lead to spurious left/outer join result records.
In this release, we changed the behavior to avoid spurious results and left/outer join result are only emitted after the join window is closed, i.e., after the grace period elapsed.
To maintain backward compatibility, the old API <code>JoinWindows#of(timeDifference)</code> preserves the old eager-emit behavior and only the new
The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
(which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated
and will be removed, as they were never intended for public use. We have also deprecated the <code>org.apache.kafka.streams.processor.TaskMetadata</code> class and introduced a new interface
<code>org.apache.kafka.streams.TaskMetadata</code> to be used instead. This change was introduced to better reflect the fact that <code>TaskMetadata</code> was not meant to be instantiated outside
of Kafka codebase.
Please note that the new <code>TaskMetadata</code> offers APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new
<code>org.apache.kafka.streams.TaskMetadata</code> which offers these better methods, for example, by using the new <code>ThreadMetadata#activeTasks</code> and <code>ThreadMetadata#standbyTasks</code>.
<code>org.apache.kafka.streams.processor.ThreadMetadata</code> class is also now deprecated and the newly introduced interface <code>org.apache.kafka.streams.ThreadMetadata</code> is to be used instead. In this new <code>ThreadMetadata</code>
interface, any reference to the deprecated <code>TaskMetadata</code> is replaced by the new interface.
Finally, also <code>org.apache.kafka.streams.state.StreamsMetadata</code> has been deprecated. Please migrate to the new <code>org.apache.kafka.streams.StreamsMetadata</code>.
We have deprecated several methods under <code>org.apache.kafka.streams.KafkaStreams</code> that returned the aforementioned deprecated classes:
<li>Users of <code>KafkaStreams#allMetadata</code> are meant to migrate to the new <code>KafkaStreams#metadataForAllStreamsClients</code>.</li>
<li>Users of <code>KafkaStreams#allMetadataForStore(String)</code> are meant to migrate to the new <code>KafkaStreams#streamsMetadataForStore(String)</code>.</li>
<li>Users of <code>KafkaStreams#localThreadsMetadata</code> are meant to migrate to the new <code>KafkaStreams#metadataForLocalThreads</code>.</li>
</ul>
<p>See <ahref="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> and <ahref="https://cwiki.apache.org/confluence/x/XIrOCg">KIP-744</a> for more details.</p>
<li><code>--zookeeper</code> flag of the application reset tool: deprecated in Kafka 1.0.0 (<ahref="https://cwiki.apache.org/confluence/x/6J1jB">KIP-198</a>).</li>
<li><code>--execute</code> flag of the application reset tool: deprecated in Kafka 1.1.0 (<ahref="https://cwiki.apache.org/confluence/x/ApI7B">KIP-171</a>).</li>
<li><code>StreamsBuilder#addGlobalStore</code> (one overload): deprecated in Kafka 1.1.0 (<ahref="https://cwiki.apache.org/confluence/x/vKpzB">KIP-233</a>).</li>
<li><code>ProcessorContext#forward</code> (some overloads): deprecated in Kafka 2.0.0 (<ahref="https://cwiki.apache.org/confluence/x/Ih6HB">KIP-251</a>).</li>
<li><code>WindowBytesStoreSupplier#segments</code>: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/x/mQU0BQ">KIP-319</a>).</li>
<li><code>segments, until, maintainMs</code> on <code>TimeWindows</code>, <code>JoinWindows</code>, and <code>SessionWindows</code>: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/x/sQU0BQ">KIP-328</a>).</li>
<li> Overloaded <code>JoinWindows#of, before, after</code>, <code>SessionWindows#with</code>, <code>TimeWindows#of, advanceBy</code>, <code>UnlimitedWindows#startOn</code> and <code>KafkaStreams#close</code> with <code>long</code> typed parameters: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/x/IBNPBQ">KIP-358</a>).</li>
<li> Overloaded <code>KStream#groupBy, groupByKey</code> and <code>KTable#groupBy</code> with <code>Serialized</code> parameter: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/x/mgJ1BQ">KIP-372</a>).</li>
<li><code>Joined#named, name</code>: deprecated in Kafka 2.3.0 (<ahref="https://cwiki.apache.org/confluence/x/xikYBQ">KIP-307</a>).</li>
<li><code>TopologyTestDriver#pipeInput, readOutput</code>, <code>OutputVerifier</code> and <code>ConsumerRecordFactory</code> classes (<ahref="https://cwiki.apache.org/confluence/x/tI-iBg">KIP-470</a>).</li>
<li><code>KafkaClientSupplier#getAdminClient</code>: deprecated in Kafka 2.4.0 (<ahref="https://cwiki.apache.org/confluence/x/V9XiBg">KIP-476</a>).</li>
<li> Overloaded <code>KStream#join, leftJoin, outerJoin</code> with <code>KStream</code> and <code>Joined</code> parameters: deprecated in Kafka 2.4.0 (<ahref="https://cwiki.apache.org/confluence/x/EBEgBw">KIP-479</a>).</li>
<li><code>WindowStore#put(K key, V value)</code>: deprecated in Kafka 2.4.0 (<ahref="https://cwiki.apache.org/confluence/x/kcviBg">KIP-474</a>).</li>
<li><code>UsePreviousTimeOnInvalidTimestamp</code>: deprecated in Kafka 2.5.0 as renamed to <code>UsePartitionTimeOnInvalidTimestamp</code> (<ahref="https://cwiki.apache.org/confluence/x/BxXABw">KIP-530</a>).</li>
<li> Overloaded <code>KafkaStreams#metadataForKey</code>: deprecated in Kafka 2.5.0 (<ahref="https://cwiki.apache.org/confluence/x/Xg-jBw">KIP-535</a>).</li>
<li> Overloaded <code>KafkaStreams#store</code>: deprecated in Kafka 2.5.0 (<ahref="https://cwiki.apache.org/confluence/x/QYyvC">KIP-562</a>).</li>
The following dependencies were removed from Kafka Streams:
</p>
<ul>
<li>Connect-json: As of Kafka Streams no longer has a compile time dependency on "connect:json" module (<ahref="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
We added two new methods to <code>KafkaStreams</code>, namely <code>KafkaStreams#addStreamThread()</code> and <code>KafkaStreams#removeStreamThread()</code> in
The <code>TimeWindowedDeserializer</code> constructor <code>TimeWindowedDeserializer(final Deserializer inner)</code>
was deprecated to encourage users to properly set their window size through <code>TimeWindowedDeserializer(final Deserializer inner, Long windowSize)</code>.
An additional streams config, <code>window.size.ms</code>, was added for users that cannot set the window size through
We changed the default value of <code>default.key.serde</code> and <code>default.value.serde</code> to be <code>null</code> instead of <code>ByteArraySerde</code>.
Users will now see a <code>ConfigException</code> if their serdes are not correctly configured through those configs or passed in explicitly.
For more highly available stateful applications, we've modified the task assignment algorithm to delay the movement of stateful active tasks to instances
that aren't yet caught up with that task's state. Instead, to migrate a task from one instance to another (eg when scaling out),
Streams will assign a warmup replica to the target instance so it can begin restoring the state while the active task stays available on an instance
that already had the task. The instances warming up tasks will communicate their progress to the group so that, once ready, Streams can move active
tasks to their new owners in the background. Check out <ahref="https://cwiki.apache.org/confluence/x/0i4lBg">KIP-441</a>
for full details, including several new configs for control over this new feature.
</p>
<p>
New end-to-end latency metrics have been added. These task-level metrics will be logged at the INFO level and report the min and max end-to-end latency of a record at the beginning/source node(s)
and end/terminal node(s) of a task. See <ahref="https://cwiki.apache.org/confluence/x/gBkRCQ">KIP-613</a> for more information.
If you need to write into and read back from a topic that you manage, you can fall back to use <code>KStream.to()</code> in combination with <code>StreamsBuilder#stream()</code>.
Please refer to the <ahref="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details about <code>KStream.repartition()</code>.
<h3class="anchor-heading"><aid="streams_api_changes_250"class="anchor-link"></a><ahref="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>
Deprecated <code>KafkaStreams.store(String, QueryableStoreType)</code> and replaced it with <code>KafkaStreams.store(StoreQueryParameters)</code> to allow querying
<h3class="anchor-heading"><aid="streams_api_changes_240"class="anchor-link"></a><ahref="#streams_api_changes_240">Streams API changes in 2.4.0</a></h3>
With the introduction of incremental cooperative rebalancing, Streams no longer requires all tasks be revoked at the beginning of a rebalance. Instead, at the completion of the rebalance only those tasks which are to be migrated to another consumer
for overall load balance will need to be closed and revoked. This changes the semantics of the <code>StateListener</code> a bit, as it will not necessarily transition to <code>REBALANCING</code> at the beginning of a rebalance anymore. Note that
this means IQ will now be available at all times except during state restoration, including while a rebalance is in progress. If restoration is occurring when a rebalance begins, we will continue to actively restore the state stores and/or process
standby tasks during a cooperative rebalance. Note that with this new rebalancing protocol, you may sometimes see a rebalance be followed by a second short rebalance that ensures all tasks are safely distributed. For details on please see
<h3class="anchor-heading"><aid="streams_api_changes_230"class="anchor-link"></a><ahref="#streams_api_changes_230">Streams API changes in 2.3.0</a></h3>
The new public interfaces <code>inMemoryWindowStore()</code> and <code>inMemorySessionStore()</code> are added to <code>Stores</code> and provide the built-in in-memory window or session store.
First add this line to your properties <code>properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);</code>, as you have done before.
To improve operator semantics, new store types are added that allow storing an additional timestamp per key-value pair or window.
Some DSL operators (for example KTables) are using those new stores.
Hence, you can now retrieve the last update timestamp via Interactive Queries if you specify
<code>TimestampedKeyValueStoreType</code> or <code>TimestampedWindowStoreType</code> as your <code>QueryableStoreType</code>.
While this change is mainly transparent, there are some corner cases that may require code changes:
<strong>Caution: If you receive an untyped store and use a cast, you might need to update your code to cast to the correct type.
Otherwise, you might get an exception similar to
<code>java.lang.ClassCastException: class org.apache.kafka.streams.state.ValueAndTimestamp cannot be cast to class YOUR-VALUE-TYPE</code>
upon getting a value from the store.</strong>
Additionally, <code>TopologyTestDriver#getStateStore()</code> only returns non-built-in stores and throws an exception if a built-in store is accessed.
The new version allows to specify more RocksDB configurations, including <code>WriteBufferManager</code> which helps to limit RocksDB off-heap memory usage.
<h3class="anchor-heading"><aid="streams_api_changes_220"class="anchor-link"></a><ahref="#streams_api_changes_220">Streams API changes in 2.2.0</a></h3>
We've simplified the <code>KafkaStreams#state</code> transition diagram during the starting up phase a bit in 2.2.0: in older versions the state will transit from <code>CREATED</code> to <code>RUNNING</code>, and then to <code>REBALANCING</code> to get the first
stream task assignment, and then back to <code>RUNNING</code>; starting in 2.2.0 it will transit from <code>CREATED</code> directly to <code>REBALANCING</code> and then to <code>RUNNING</code>.
If you have registered a <code>StateListener</code> that captures state transition events, you may need to adjust your listener implementation accordingly for this simplification (in practice, your listener logic should be very unlikely to be affected at all).
</p>
<p>
In <code>WindowedSerdes</code>, we've added a new static constructor to return a <code>TimeWindowSerde</code> with configurable window size. This is to help users to construct time window serdes to read directly from a time-windowed store's changelog.
used in a try-with-resource statement. For a full list of public interfaces that get impacted please read <ahref="https://cwiki.apache.org/confluence/x/-AeQBQ">KIP-376</a>.
<h3class="anchor-heading"><aid="streams_api_changes_210"class="anchor-link"></a><ahref="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
We've added a new class <code>Grouped</code> and deprecated <code>Serialized</code>. The intent of adding <code>Grouped</code> is the ability to
name repartition topics created when performing aggregation operations. Users can name the potential repartition topic using the
<code>Grouped#as()</code> method which takes a <code>String</code> and is used as part of the repartition topic name. The resulting repartition
topic name will still follow the pattern of <code>${application-id}->name<-repartition</code>. The <code>Grouped</code> class is now favored over
<code>Serialized</code> in <code>KStream#groupByKey()</code>, <code>KStream#groupBy()</code>, and <code>KTable#groupBy()</code>.
Note that Kafka Streams does not automatically create repartition topics for aggregation operations.
Additionally, we've updated the <code>Joined</code> class with a new method <code>Joined#withName</code>
enabling users to name any repartition topics required for performing Stream/Stream or Stream/Table join. For more details repartition
As a result we've updated the Kafka Streams Scala API and removed the <code>Serialized</code> class in favor of adding <code>Grouped</code>.
If you just rely on the implicit <code>Serialized</code>, you just need to recompile; if you pass in <code>Serialized</code> explicitly, sorry you'll have to make code changes.
We've added a new config named <code>max.task.idle.ms</code> to allow users specify how to handle out-of-order data within a task that may be processing multiple
topic-partitions (see <ahref="/{{version}}/documentation/streams/core-concepts.html#streams_out_of_ordering">Out-of-Order Handling</a> section for more details).
The default value is set to <code>0</code>, to favor minimized latency over synchronization between multiple input streams from topic-partitions.
If users would like to wait for longer time when some of the topic-partitions do not have data available to process and hence cannot determine its corresponding stream time,
We've added the missing <code>SessionBytesStoreSupplier#retentionPeriod()</code> to be consistent with the <code>WindowBytesStoreSupplier</code> which allows users to get the specified retention period for session-windowed stores.
We've also added the missing <code>StoreBuilder#withCachingDisabled()</code> to allow users to turn off caching for their customized stores.
and replaced them with <code>Instant</code> and <code>Duration</code> parameters for improved semantics.
Some old methods base on <code>long</code> are deprecated and users are encouraged to update their code.
<br/>
In particular, aggregation windows (hopping/tumbling/unlimited time windows and session windows) as well as join windows now take <code>Duration</code>
arguments to specify window size, hop, and gap parameters.
Also, window sizes and retention times are now specified as <code>Duration</code> type in <code>Stores</code> class.
The <code>Window</code> class has new methods <code>#startTime()</code> and <code>#endTime()</code> that return window start/end timestamp as <code>Instant</code>.
We deprecated <code>KafkaStreams#close(...)</code> and replaced it with <code>KafkaStreams#close(Duration)</code> that accepts a single timeout argument
Finally, <code>Stores#persistentWindowStore(...)</code> were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer.
We've added an overloaded <code>StreamsBuilder#build</code> method that accepts an instance of <code>java.util.Properties</code> with the intent of using the
executing a <code>StreamsBuilder#build</code> call, Kafka Streams can optimize the physical plan of the topology, provided the <code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code>
config is set to <code>StreamsConfig#OPTIMIZE</code>. By setting <code>StreamsConfig#OPTIMIZE</code> in addition to the <code>KTable</code> optimization of
reusing the source topic as the changelog topic, the topology may be optimized to merge redundant repartition topics into one
repartition topic. The original no parameter version of <code>StreamsBuilder#build</code> is still available for those who wish to not
optimize their topology. Note that enabling optimization of the topology may require you to do an application reset when redeploying the application. For more
We are introducing static membership towards Kafka Streams user. This feature reduces unnecessary rebalances during normal application upgrades or rolling bounces.
<h3class="anchor-heading"><aid="streams_api_changes_200"class="anchor-link"></a><ahref="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
In 2.0.0 we have added a few new APIs on the <code>ReadOnlyWindowStore</code> interface (for details please read <ahref="#streams_api_changes_200">Streams API changes</a> below).
If you have customized window store implementations that extends the <code>ReadOnlyWindowStore</code> interface you need to make code changes.
</p>
<p>
In addition, if you using Java 8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities.
Hot-swapping the jar-file only might not work for this case.
See below a complete list of <ahref="#streams_api_changes_200">2.0.0</a>
API and semantic changes that allow you to advance your application and/or simplify your code base.
</p>
<p>
We moved <code>Consumed</code> interface from <code>org.apache.kafka.streams</code> to <code>org.apache.kafka.streams.kstream</code>
as it was mistakenly placed in the previous release. If your code has already used it there is a simple one-liner change needed in your import statement.
</p>
<p>
We have also removed some public APIs that are deprecated prior to 1.0.x in 2.0.0.
We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying a single window's key-value pair.
For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well.
We have added public <code>WindowedSerdes</code> to allow users to read from / write to a topic storing windowed table changelogs directly.
In addition, in <code>StreamsConfig</code> we have also added <code>default.windowed.key.serde.inner</code> and <code>default.windowed.value.serde.inner</code>
to let users specify inner serdes if the default serde classes are windowed serdes.
We've added message header support in the <code>Processor API</code> in Kafka 2.0.0. In particular, we have added a new API <code>ProcessorContext#headers()</code>
which returns a <code>Headers</code> object that keeps track of the headers of the source topic's message that is being processed. Through this object, users can manipulate
the headers map that is being propagated throughout the processor topology as well. For more details please feel free to read
the <ahref="/{{version}}/documentation/streams/developer-guide/processor-api.html#accessing-processor-context">Developer Guide</a> section.
Kafka 2.0.0 allows to manipulate timestamps of output records using the Processor API (<ahref="https://cwiki.apache.org/confluence/x/Ih6HB">KIP-251</a>).
To enable this new feature, <code>ProcessorContext#forward(...)</code> was modified.
The two existing overloads <code>#forward(Object key, Object value, String childName)</code> and <code>#forward(Object key, Object value, int childIndex)</code> were deprecated and a new overload <code>#forward(Object key, Object value, To to)</code> was added.
The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record.
Forwarding based on child index is not supported in the new API any longer.
We have added support to allow routing records dynamically to Kafka topics. More specifically, in both the lower-level <code>Topology#addSink</code> and higher-level <code>KStream#to</code> APIs, we have added variants that
take a <code>TopicNameExtractor</code> instance instead of a specific <code>String</code> typed topic name, such that for each received record from the upstream processor, the library will dynamically determine which Kafka topic to write to
based on the record's key and value, as well as record context. Note that all the Kafka topics that may possibly be used are still considered as user topics and hence required to be pre-created. In addition to that, we have modified the
<code>StreamPartitioner</code> interface to add the topic name parameter since the topic name now may not be known beforehand; users who have customized implementations of this interface would need to update their code while upgrading their application
<ahref="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.
Instead of relying on data retention Kafka Streams uses the new purge data API to delete consumed data from those topics and to keep used storage small now.
We have modified the <code>ProcessorStateManger#register(...)</code> signature and removed the deprecated <code>loggingEnabled</code> boolean parameter as it is specified in the <code>StoreBuilder</code>.
Users who used this function to register their state stores into the processor topology need to simply update their code and remove this parameter from the caller.
</p>
<p>
Kafka Streams DSL for Scala is a new Kafka Streams client library available for developers authoring Kafka Streams applications in Scala. It wraps core Kafka Streams DSL types to make it easier to call when
interoperating with Scala code. For example, it includes higher order functions as parameters for transformations avoiding the need anonymous classes in Java 7 or experimental SAM type conversions in Scala 2.11,
automatic conversion between Java and Scala collection types, a way
<li><code>KafkaStreams#toString</code> no longer returns the topology and runtime metadata; to get topology metadata users can call <code>Topology#describe()</code> and to get thread runtime metadata users can call <code>KafkaStreams#localThreadsMetadata</code> (they are deprecated since 1.0.0).
For detailed guidance on how to update your code please read <ahref="#streams_api_changes_100">here</a></li>
<li><code>TopologyBuilder</code> and <code>KStreamBuilder</code> are removed and replaced by <code>Topology</code> and <code>StreamsBuidler</code> respectively (they are deprecated since 1.0.0).
For detailed guidance on how to update your code please read <ahref="#streams_api_changes_100">here</a></li>
<li><code>StateStoreSupplier</code> are removed and replaced with <code>StoreBuilder</code> (they are deprecated since 1.0.0);
and the corresponding <code>Stores#create</code> and <code>KStream, KTable, KGroupedStream</code> overloaded functions that use it have also been removed.
For detailed guidance on how to update your code please read <ahref="#streams_api_changes_100">here</a></li>
<li><code>KStream, KTable, KGroupedStream</code> overloaded functions that requires serde and other specifications explicitly are removed and replaced with simpler overloaded functions that use <code>Consumed, Produced, Serialized, Materialized, Joined</code> (they are deprecated since 1.0.0).
For detailed guidance on how to update your code please read <ahref="#streams_api_changes_100">here</a></li>
<li><code>Processor#punctuate</code>, <code>ValueTransformer#punctuate</code>, <code>ValueTransformer#punctuate</code> and <code>ProcessorContext#schedule(long)</code> are removed and replaced by <code>ProcessorContext#schedule(long, PunctuationType, Punctuator)</code> (they are deprecated in 1.0.0). </li>
<li>The second <code>boolean</code> typed parameter "loggingEnabled" in <code>ProcessorContext#register</code> has been removed; users can now use <code>StoreBuilder#withLoggingEnabled, withLoggingDisabled</code> to specify the behavior when they create the state store. </li>
<li><code>KTable#writeAs, print, foreach, to, through</code> are removed, users can call <code>KTable#tostream()#writeAs</code> instead for the same purpose (they are deprecated since 0.11.0.0).
For detailed list of removed APIs please read <ahref="#streams_api_changes_0110">here</a></li>
<li><code>StreamsConfig#KEY_SERDE_CLASS_CONFIG, VALUE_SERDE_CLASS_CONFIG, TIMESTAMP_EXTRACTOR_CLASS_CONFIG</code> are removed and replaced with <code>StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG, DEFAULT_VALUE_SERDE_CLASS_CONFIG, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG</code> respectively (they are deprecated since 0.11.0.0). </li>
<li><code>StreamsConfig#ZOOKEEPER_CONNECT_CONFIG</code> are removed as we do not need ZooKeeper dependency in Streams any more (it is deprecated since 0.10.2.0). </li>
<h3class="anchor-heading"><aid="streams_api_changes_110"class="anchor-link"></a><ahref="#streams_api_changes_110">Streams API changes in 1.1.0</a></h3>
We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying <code>WindowStore</code>s without the necessity of providing keys.
For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well.
There is a new artifact <code>kafka-streams-test-utils</code> providing a <code>TopologyTestDriver</code>, <code>ConsumerRecordFactory</code>, and <code>OutputVerifier</code> class.
You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application.
enables you to provide configuration parameters for the embedded admin client created by Kafka Streams, similar to the embedded producer and consumer clients.
You can provide the configs via <code>StreamsConfig</code> by adding the configs with the prefix <code>admin.</code> as defined by <code>StreamsConfig#adminClientPrefix(String)</code>
to distinguish them from configurations of other clients that share the same config names.
<li><code>transformValues</code> methods have been added to <code>KTable</code>. Similar to those on <code>KStream</code>, these methods allow for richer, stateful, value transformation similar to the Processor API.</li>
<li> A method has been provided such that it will return the store name associated with the <code>GlobalKTable</code> or <code>null</code> if the store name is non-queryable. </li>
<li> added overload for the constructor that allows overriding the <code>Time</code> object used for tracking system wall-clock time; this is useful for unit testing your application code. </li>
</ul>
<p> New methods in <code>KafkaClientSupplier</code>: </p>
<ul>
<li> added <code>getAdminClient(config)</code> that allows to override an <code>AdminClient</code> used for administrative requests such as internal topic creations, etc. </li>
<p>New error handling for exceptions during production:</p>
<ul>
<li>added interface <code>ProductionExceptionHandler</code> that allows implementors to decide whether or not Streams should <code>FAIL</code> or <code>CONTINUE</code> when certain exception occur while trying to produce.</li>
<li>provided an implementation, <code>DefaultProductionExceptionHandler</code> that always fails, preserving the existing behavior by default.</li>
<li>changing which implementation is used can be done by settings <code>default.production.exception.handler</code> to the fully qualified name of a class implementing this interface.</li>
<h3class="anchor-heading"><aid="streams_api_changes_100"class="anchor-link"></a><ahref="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3>
a new method <code>merge()</code> has been created in <code>KStream</code> as the StreamsBuilder class's <code>StreamsBuilder#merge()</code> has been removed.
The method signature was also changed, too: instead of providing multiple <code>KStream</code>s into the method at the once, only a single <code>KStream</code> is accepted.
<li>retrieve the current runtime information about the local threads via <code>localThreadsMetadata()</code></li>
<li>observe the restoration of all state stores via <code>setGlobalStateRestoreListener()</code>, in which users can provide their customized implementation of the <code>org.apache.kafka.streams.processor.StateRestoreListener</code> interface</li>
They have been deprecated in favor of using the new classes/methods <code>localThreadsMetadata()</code> / <code>ThreadMetadata</code> (returning runtime information) and
you should no longer pass in <code>Serde</code> to <code>KStream#print</code> operations.
If you can't rely on using <code>toString</code> to print your keys an values, you should instead you provide a custom <code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code> call.
The Processor API was extended to allow users to schedule <code>punctuate</code> functions either based on data-driven <b>stream time</b> or wall-clock time.
As a result, the original <code>ProcessorContext#schedule</code> is deprecated with a new overloaded function that accepts a user customizable <code>Punctuator</code> callback interface, which triggers its <code>punctuate</code> API method periodically based on the <code>PunctuationType</code>.
The <code>PunctuationType</code> determines what notion of time is used for the punctuation scheduling: either <ahref="/{{version}}/documentation/streams/core-concepts#streams_time">stream time</a> or wall-clock time (by default, <b>stream time</b> is configured to represent event time via <code>TimestampExtractor</code>).
In addition, the <code>punctuate</code> function inside <code>Processor</code> is also deprecated.
</p>
<p>
Before this, users could only schedule based on stream time (i.e. <code>PunctuationType.STREAM_TIME</code>) and hence the <code>punctuate</code> function was data-driven only because stream time is determined (and advanced forward) by the timestamps derived from the input data.
If there is no data arriving at the processor, the stream time would not advance and hence punctuation will not be triggered.
On the other hand, When wall-clock time (i.e. <code>PunctuationType.WALL_CLOCK_TIME</code>) is used, <code>punctuate</code> will be triggered purely based on wall-clock time.
So for example if the <code>Punctuator</code> function is scheduled based on <code>PunctuationType.WALL_CLOCK_TIME</code>, if these 60 records were processed within 20 seconds,
<code>punctuate</code> would be called 2 times (one time every 10 seconds);
if these 60 records were processed within 5 seconds, then no <code>punctuate</code> would be called at all.
Users can schedule multiple <code>Punctuator</code> callbacks with different <code>PunctuationType</code>s within the same processor by simply calling <code>ProcessorContext#schedule</code> multiple times inside processor's <code>init()</code> method.
If you are monitoring on task level or processor-node / state store level Streams metrics, please note that the metrics sensor name and hierarchy was changed:
The task ids, store names and processor names are no longer in the sensor metrics names, but instead are added as tags of the sensors to achieve consistent metrics hierarchy.
As a result you may need to make corresponding code changes on your metrics reporting and monitoring tools when upgrading to 1.0.0.
enables you to provide a default exception handler for deserialization errors when reading data from Kafka rather than throwing the exception all the way out of your streams application.
You can provide the configs via the <code>StreamsConfig</code> as <code>StreamsConfig#DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG</code>.
The specified handler must implement the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.
enables you to provide topic configuration parameters for any topics created by Kafka Streams.
This includes repartition and changelog topics.
You can provide the configs via the <code>StreamsConfig</code> by adding the configs with the prefix as defined by <code>StreamsConfig#topicPrefix(String)</code>.
Any properties in the <code>StreamsConfig</code> with the prefix will be applied when creating internal topics.
Any configs that aren't topic configs will be ignored.
If you already use <code>StateStoreSupplier</code> or <code>Materialized</code> to provide configs for changelogs, then they will take precedence over those supplied in the config.
<h3class="anchor-heading"><aid="streams_api_changes_0110"class="anchor-link"></a><ahref="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
<li> added overloads for <code>addSource()</code> that allow to define a <code>TimestampExtractor</code> per source node </li>
<li> added overloads for <code>addGlobalStore()</code> that allow to define a <code>TimestampExtractor</code> per source node associated with the global store </li>
<p><code>[client.Id]</code> is either set via Streams configuration parameter <code>client.id</code> or defaults to <code>[application.id]-[processId]</code> (<code>[processId]</code> is a random UUID). </p>
<h3class="anchor-heading"><aid="streams_api_changes_01021"class="anchor-link"></a><ahref="#streams_api_changes_01021">Notable changes in 0.10.2.1</a></h3>
<li> The default config values of embedded producer's <code>retries</code> and consumer's <code>max.poll.interval.ms</code> have been changed to improve the resiliency of a Kafka Streams application </li>
<h3class="anchor-heading"><aid="streams_api_changes_0102"class="anchor-link"></a><ahref="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3>
<li> parameter <code>zookeeper.connect</code> was deprecated; a Kafka Streams application does no longer interact with ZooKeeper for topic management but uses the new broker admin protocol
<li> added overloads for <code>stream()</code> and <code>table()</code> that allow to define a <code>auto.offset.reset</code> policy per input stream/table </li>
<li> added method <code>globalKTable()</code> to create a <code>GlobalKTable</code></li>
<p> Aligned <code>null</code>-key handling for <code>KTable</code> joins: </p>
<ul>
<li> like all other KTable operations, <code>KTable-KTable</code> joins do not throw an exception on <code>null</code> key records anymore, but drop those records silently </li>
</ul>
<p> New window type <em>Session Windows</em>: </p>
<ul>
<li> added class <code>SessionWindows</code> to specify session windows </li>
<h3class="anchor-heading"><aid="streams_api_changes_0101"class="anchor-link"></a><ahref="#streams_api_changes_0101">Streams API changes in 0.10.1.0</a></h3>
<p> Stream grouping and aggregation split into two methods: </p>
<ul>
<li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey() </li>
<li> new: KStream#groupByKey() plus KGroupedStream #aggregate(), #reduce(), and #count() </li>
<li> Example: stream.countByKey() changes to stream.groupByKey().count() </li>
</ul>
<p> Auto Repartitioning: </p>
<ul>
<li> a call to through() after a key-changing operator and before an aggregation/join is no longer required </li>
<li> Example: stream.selectKey(...).through(...).countByKey() changes to stream.selectKey().groupByKey().count() </li>
</ul>
<p> TopologyBuilder: </p>
<ul>
<li> methods #sourceTopics(String applicationId) and #topicGroups(String applicationId) got simplified to #sourceTopics() and #topicGroups() </li>
</ul>
<p> DSL: new parameter to specify state store names: </p>
<ul>
<li> The new Interactive Queries feature requires to specify a store name for all source KTables and window aggregation result KTables (previous parameter "operator/window name" is now the storeName) </li>
<li> KStreamBuilder#table(String topic) changes to #topic(String topic, String storeName) </li>
<li> KTable#through(String topic) changes to #through(String topic, String storeName) </li>
<li> Example: stream.countByKey(TimeWindows.of("windowName", 1000)) changes to stream.groupByKey().count(TimeWindows.of(1000), "countStoreName") </li>
</ul>
<p> Windowing: </p>
<ul>
<li> Windows are not named anymore: TimeWindows.of("name", 1000) changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store names) </li>
<li> JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) </li>
<h3class="anchor-heading"><aid="streams_api_broker_compat"class="anchor-link"></a><ahref="#streams_api_broker_compat">Streams API broker compatibility</a></h3>
<p>The following table shows which versions of the Kafka Streams API are compatible with various Kafka broker versions. For Kafka Stream version older than 2.4.x, please check <ahref="/39/documentation/streams/upgrade-guide">3.9 upgrade document</a>.</p>