<li>The <ahref="#adminapi">Admin</a> API allows managing and inspecting topics, brokers, and other Kafka objects.
</ol>
Kafka exposes all its functionality over a language independent protocol which has clients available in many programming languages. However only the Java clients are maintained as part of the main Kafka project, the others are available as independent open source projects. A list of non-Java clients is available <ahref="https://cwiki.apache.org/confluence/display/KAFKA/Clients">here</a>.
Kafka exposes all its functionality over a language independent protocol which has clients available in many programming languages. However only the Java clients are maintained as part of the main Kafka project, the others are available as independent open source projects. A list of non-Java clients is available <ahref="https://cwiki.apache.org/confluence/x/3gDVAQ">here</a>.
<li><code>DELETE /connectors/{name}</code> - delete a connector, halting all tasks and deleting its configuration</li>
<li><code>GET /connectors/{name}/topics</code> - get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued</li>
<li><code>PUT /connectors/{name}/topics/reset</code> - send a request to empty the set of active topics of a connector</li>
<li>Offsets management endpoints (see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect">KIP-875</a> for more details):
<li>Offsets management endpoints (see <ahref="https://cwiki.apache.org/confluence/x/Io3GDQ">KIP-875</a> for more details):
<ul>
<li><code>GET /connectors/{name}/offsets</code> - get the current offsets for a connector</li>
<li><code>DELETE /connectors/{name}/offsets</code> - reset the offsets for a connector. The connector must exist and must be in the stopped state (see <ahref="#connect_stopconnector"><code>PUT /connectors/{name}/stop</code></a>)</li>
<li><code>PUT /admin/loggers/{name}</code> - set the log level for the specified logger</li>
</ul>
<p>See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-495%3A+Dynamically+Adjust+Log+Levels+in+Connect">KIP-495</a> for more details about the admin logger REST APIs.</p>
<p>See <ahref="https://cwiki.apache.org/confluence/x/-4tTBw">KIP-495</a> for more details about the admin logger REST APIs.</p>
<p>For the complete specification of the Kafka Connect REST API, see the <ahref="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a></p>
@ -817,7 +817,7 @@ if (offset != null) {
<h6>Supporting exactly-once</h6>
<p>With the passing of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
<p>With the passing of <ahref="https://cwiki.apache.org/confluence/x/Vg0rCQ">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
<h6>Defining transaction boundaries</h6>
@ -985,7 +985,7 @@ Struct struct = new Struct(schema)
<p>
Starting with 2.3.0, Kafka Connect is using by default a protocol that performs
that incrementally balances the connectors and tasks across the Connect workers, affecting only tasks that are new, to be removed, or need to move from one worker to another.
Other tasks are not stopped and restarted during the rebalance, as they would have been with the old protocol.
example, it validates that the number of records in the batch is same as what batch header states. This batch of messages is then written to disk in compressed form. The batch will remain compressed in the log and it will also be transmitted to the
consumer in compressed form. The consumer decompresses any compressed data that it receives.
<p>
Kafka supports GZIP, Snappy, LZ4 and ZStandard compression protocols. More details on compression can be found <ahref="https://cwiki.apache.org/confluence/display/KAFKA/Compression">here</a>.
Kafka supports GZIP, Snappy, LZ4 and ZStandard compression protocols. More details on compression can be found <ahref="https://cwiki.apache.org/confluence/x/S5qoAQ">here</a>.
<h3class="anchor-heading"><aid="theproducer"class="anchor-link"></a><ahref="#theproducer">4.4 The Producer</a></h3>
@ -240,7 +240,7 @@
the broker version and then throws an UnsupportedException. If you accidentally configure duplicate ids for different instances,
a fencing mechanism on broker side will inform your duplicate client to shutdown immediately by triggering a <code>org.apache.kafka.common.errors.FencedInstanceIdException</code>.
There are a plethora of tools that integrate with Kafka outside the main distribution. The <ahref="https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem"> ecosystem page</a> lists many of these, including stream processing systems, Hadoop integration, monitoring, and deployment tools.
There are a plethora of tools that integrate with Kafka outside the main distribution. The <ahref="https://cwiki.apache.org/confluence/x/Ri3VAQ"> ecosystem page</a> lists many of these, including stream processing systems, Hadoop integration, monitoring, and deployment tools.
@ -162,7 +162,7 @@ Deletion of requested consumer groups ('my-group', 'my-other-group') was success
<p>
To reset offsets of a consumer group, "--reset-offsets" option can be used.
This option supports one consumer group at the time. It requires defining following scopes: --all-topics or --topic. One scope must be selected, unless you use '--from-file' scenario. Also, first make sure that the consumer instances are inactive.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling">KIP-122</a> for more details.
See <ahref="https://cwiki.apache.org/confluence/x/_iEIB">KIP-122</a> for more details.
The following sections describe how to configure and run a dedicated MirrorMaker cluster. If you want to run MirrorMaker within an existing Kafka Connect cluster or other supported deployment setups, please refer to <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0">KIP-382: MirrorMaker 2.0</a> and be aware that the names of configuration settings may vary between deployment modes.
The following sections describe how to configure and run a dedicated MirrorMaker cluster. If you want to run MirrorMaker within an existing Kafka Connect cluster or other supported deployment setups, please refer to <ahref="https://cwiki.apache.org/confluence/x/ooOzBQ">KIP-382: MirrorMaker 2.0</a> and be aware that the names of configuration settings may vary between deployment modes.
<li><ahref="https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java">DefaultTopicFilter</a> for topics, <ahref="https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java">DefaultGroupFilter</a> for consumer groups</li>
<li>Example configuration settings in <ahref="https://github.com/apache/kafka/blob/trunk/config/connect-mirror-maker.properties">connect-mirror-maker.properties</a>, <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0">KIP-382: MirrorMaker 2.0</a></li>
<li>Example configuration settings in <ahref="https://github.com/apache/kafka/blob/trunk/config/connect-mirror-maker.properties">connect-mirror-maker.properties</a>, <ahref="https://cwiki.apache.org/confluence/x/ooOzBQ">KIP-382: MirrorMaker 2.0</a></li>
In either case, it is also necessary to enable intra-cluster communication between the MirrorMaker nodes, as described in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters">KIP-710</a>. To do this, the <code>dedicated.mode.enable.internal.rest</code> property must be set to <code>true</code>. In addition, many of the REST-related <ahref="https://kafka.apache.org/documentation/#connectconfigs">configuration properties available for Kafka Connect</a> can be specified the MirrorMaker config. For example, to enable intra-cluster communication in MirrorMaker cluster with each node listening on port 8080 of their local machine, the following should be added to the MirrorMaker config file:
In either case, it is also necessary to enable intra-cluster communication between the MirrorMaker nodes, as described in <ahref="https://cwiki.apache.org/confluence/x/4g5RCg">KIP-710</a>. To do this, the <code>dedicated.mode.enable.internal.rest</code> property must be set to <code>true</code>. In addition, many of the REST-related <ahref="https://kafka.apache.org/documentation/#connectconfigs">configuration properties available for Kafka Connect</a> can be specified the MirrorMaker config. For example, to enable intra-cluster communication in MirrorMaker cluster with each node listening on port 8080 of their local machine, the following should be added to the MirrorMaker config file:
<li>Use <ahref="#security_authz">prefix ACLs</a> (cf. <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a>) to enforce a common prefix for topic names. For example, team A may only be permitted to create topics whose names start with <code>payments.teamA.</code>.</li>
<li>Define a custom <code>CreateTopicPolicy</code> (cf. <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-108%3A+Create+Topic+Policy">KIP-108</a> and the setting <ahref="#brokerconfigs_create.topic.policy.class.name">create.topic.policy.class.name</a>) to enforce strict naming patterns. These policies provide the most flexibility and can cover complex patterns and rules to match an organization's needs.</li>
<li>Use <ahref="#security_authz">prefix ACLs</a> (cf. <ahref="https://cwiki.apache.org/confluence/x/QpvLB">KIP-290</a>) to enforce a common prefix for topic names. For example, team A may only be permitted to create topics whose names start with <code>payments.teamA.</code>.</li>
<li>Define a custom <code>CreateTopicPolicy</code> (cf. <ahref="https://cwiki.apache.org/confluence/x/Iw8IB">KIP-108</a> and the setting <ahref="#brokerconfigs_create.topic.policy.class.name">create.topic.policy.class.name</a>) to enforce strict naming patterns. These policies provide the most flexibility and can cover complex patterns and rules to match an organization's needs.</li>
<li>Disable topic creation for normal users by denying it with an ACL, and then rely on an external process to create topics on behalf of users (e.g., scripting or your favorite automation toolkit).</li>
<li>It may also be useful to disable the Kafka feature to auto-create topics on demand by setting <code>auto.create.topics.enable=false</code> in the broker configuration. Note that you should not rely solely on this option.</li>
</ul>
@ -1155,7 +1155,7 @@ Security settings for Kafka fall into three main categories, which are similar t
<ol>
<li><strong>Encryption</strong> of data transferred between Kafka brokers and Kafka clients, between brokers, and between brokers and other optional tools.</li>
<li><strong>Authentication</strong> of connections from Kafka clients and applications to Kafka brokers, as well as connections between Kafka brokers.</li>
<li><strong>Authorization</strong> of client operations such as creating, deleting, and altering the configuration of topics; writing events to or reading events from a topic; creating and deleting ACLs. Administrators can also define custom policies to put in place additional restrictions, such as a <code>CreateTopicPolicy</code> and <code>AlterConfigPolicy</code> (see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-108%3A+Create+Topic+Policy">KIP-108</a> and the settings <ahref="#brokerconfigs_create.topic.policy.class.name">create.topic.policy.class.name</a>, <ahref="#brokerconfigs_alter.config.policy.class.name">alter.config.policy.class.name</a>).</li>
<li><strong>Authorization</strong> of client operations such as creating, deleting, and altering the configuration of topics; writing events to or reading events from a topic; creating and deleting ACLs. Administrators can also define custom policies to put in place additional restrictions, such as a <code>CreateTopicPolicy</code> and <code>AlterConfigPolicy</code> (see <ahref="https://cwiki.apache.org/confluence/x/Iw8IB">KIP-108</a> and the settings <ahref="#brokerconfigs_create.topic.policy.class.name">create.topic.policy.class.name</a>, <ahref="#brokerconfigs_alter.config.policy.class.name">alter.config.policy.class.name</a>).</li>
</ol>
<p>
@ -1184,7 +1184,7 @@ $ bin/kafka-acls.sh \
</p>
<p>
<strong>Client quotas:</strong> Kafka supports different types of (per-user principal) client quotas. Because a client's quotas apply irrespective of which topics the client is writing to or reading from, they are a convenient and effective tool to allocate resources in a multi-tenant cluster. <ahref="#design_quotascpu">Request rate quotas</a>, for example, help to limit a user's impact on broker CPU usage by limiting the time a broker spends on the <ahref="/protocol.html">request handling path</a> for that user, after which throttling kicks in. In many situations, isolating users with request rate quotas has a bigger impact in multi-tenant clusters than setting incoming/outgoing network bandwidth quotas, because excessive broker CPU usage for processing requests reduces the effective bandwidth the broker can serve. Furthermore, administrators can also define quotas on topic operations—such as create, delete, and alter—to prevent Kafka clusters from being overwhelmed by highly concurrent topic operations (see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-599%3A+Throttle+Create+Topic%2C+Create+Partition+and+Delete+Topic+Operations">KIP-599</a> and the quota type <code>controller_mutation_rate</code>).
<strong>Client quotas:</strong> Kafka supports different types of (per-user principal) client quotas. Because a client's quotas apply irrespective of which topics the client is writing to or reading from, they are a convenient and effective tool to allocate resources in a multi-tenant cluster. <ahref="#design_quotascpu">Request rate quotas</a>, for example, help to limit a user's impact on broker CPU usage by limiting the time a broker spends on the <ahref="/protocol.html">request handling path</a> for that user, after which throttling kicks in. In many situations, isolating users with request rate quotas has a bigger impact in multi-tenant clusters than setting incoming/outgoing network bandwidth quotas, because excessive broker CPU usage for processing requests reduces the effective bandwidth the broker can serve. Furthermore, administrators can also define quotas on topic operations—such as create, delete, and alter—to prevent Kafka clusters from being overwhelmed by highly concurrent topic operations (see <ahref="https://cwiki.apache.org/confluence/x/6DLcC">KIP-599</a> and the quota type <code>controller_mutation_rate</code>).
</p>
<p>
@ -4007,7 +4007,7 @@ foo
<p>In the tiered storage approach, Kafka cluster is configured with two tiers of storage - local and remote.
The local tier is the same as the current Kafka that uses the local disks on the Kafka brokers to store the log segments.
The new remote tier uses external storage systems, such as HDFS or S3, to store the completed log segments.
Please check <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage">KIP-405</a> for more information.
Please check <ahref="https://cwiki.apache.org/confluence/x/KJDQBQ">KIP-405</a> for more information.
<p>Starting from Apache Kafka 4.0, the Next Generation of the Consumer Rebalance Protocol (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol">KIP-848</a>)
<p>Starting from Apache Kafka 4.0, the Next Generation of the Consumer Rebalance Protocol (<ahref="https://cwiki.apache.org/confluence/x/HhD1D">KIP-848</a>)
is Generally Available (GA). It improves the scalability of consumer groups while simplifying consumers. It also decreases rebalance times, thanks to
its fully incremental design, which no longer relies on a global synchronization barrier.</p>
<p>Starting from Apache Kafka 4.0, Transactions Server Side Defense (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense">KIP-890</a>)
<p>Starting from Apache Kafka 4.0, Transactions Server Side Defense (<ahref="https://cwiki.apache.org/confluence/x/B40ODg">KIP-890</a>)
brings a strengthened transactional protocol. When enabled and using 4.0 producer clients, the producer epoch is bumped on every transaction to ensure every transaction includes the intended messages and duplicates are not
<p>Starting from Apache Kafka 4.0, Eligible Leader Replicas (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas">KIP-966 Part 1</a>)
<p>Starting from Apache Kafka 4.0, Eligible Leader Replicas (<ahref="https://cwiki.apache.org/confluence/x/mpOzDw">KIP-966 Part 1</a>)
is available for the users to an improvement to Kafka replication. As the "strict min ISR" rule has been generally applied, which means the high watermark for the data partition can't advance
if the size of the ISR is smaller than the min ISR(<code>min.insync.replicas</code>), it makes some replicas that are not in the ISR safe to become the leader. The KRaft controller
stores such replicas in the PartitionRecord field called <code>Eligible Leader Replicas</code>. During the leader election, the controller will select the leaders
<p>The server will reject requests with a version it does not support, and will always respond to the client with exactly the protocol format it expects based on the version it included in its request. The intended upgrade path is that new features would first be rolled out on the server (with the older clients not making use of them) and then as newer clients are deployed these new features would gradually be taken advantage of. Note there is an exceptional case while <ahref="#api_versions">retrieving supported API versions</a> where the server can respond with a different version.</p>
<p>Note that <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields">KIP-482 tagged fields</a> can be added to a request without incrementing the version number. This offers an additional way of evolving the message schema without breaking compatibility. Tagged fields do not take up any space when the field is not set. Therefore, if a field is rarely used, it is more efficient to make it a tagged field than to put it in the mandatory schema. However, tagged fields are ignored by recipients that don't know about them, which could pose a challenge if this is not the behavior that the sender wants. In such cases, a version bump may be more appropriate.
<p>Note that <ahref="https://cwiki.apache.org/confluence/x/OhMyBw">KIP-482 tagged fields</a> can be added to a request without incrementing the version number. This offers an additional way of evolving the message schema without breaking compatibility. Tagged fields do not take up any space when the field is not set. Therefore, if a field is rarely used, it is more efficient to make it a tagged field than to put it in the mandatory schema. However, tagged fields are ignored by recipients that don't know about them, which could pose a challenge if this is not the behavior that the sender wants. In such cases, a version bump may be more appropriate.
<h5class="anchor-heading"><aid="api_versions"class="anchor-link"></a><ahref="#api_versions">Retrieving Supported API versions</a></h5>
<p>In order to work against multiple broker versions, clients need to know what versions of various APIs a
broker supports. The broker exposes this information since 0.10.0.0 as described in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-35+-+Retrieving+protocol+version">KIP-35</a>.
broker supports. The broker exposes this information since 0.10.0.0 as described in <ahref="https://cwiki.apache.org/confluence/x/KK6nAw">KIP-35</a>.
Clients should use the supported API versions information to choose the highest API version supported by both client and broker. If no such version
exists, an error should be reported to the user.</p>
<p>The following sequence may be used by a client to obtain supported API versions from a broker.</p>
@ -144,7 +144,7 @@
with the <ahref="#protocol_error_codes">error code</a> set to <code>UNSUPPORTED_VERSION</code> and the <code>api_versions</code>
field populated with the supported version of the <code>ApiVersionsRequest</code>. It is then up to the client to retry, making
another <code>ApiVersionsRequest</code> using the highest version supported by the client and broker.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers">KIP-511: Collect and Expose Client's Name and Version in the Brokers</a></li>
See <ahref="https://cwiki.apache.org/confluence/x/qRJ4Bw">KIP-511: Collect and Expose Client's Name and Version in the Brokers</a></li>
<li>If multiple versions of an API are supported by broker and client, clients are recommended to use the latest version supported
by the broker and itself.</li>
<li>Deprecation of a protocol version is done by marking an API version as deprecated in the protocol documentation.</li>
@ -1146,7 +1146,7 @@ sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechani
<p>Delegation token based authentication is a lightweight authentication mechanism to complement existing SASL/SSL
methods. Delegation tokens are shared secrets between kafka brokers and clients. Delegation tokens will help processing
frameworks to distribute the workload to available workers in a secure environment without the added cost of distributing
Kerberos TGT/keytabs or keystores when 2-way SSL is used. See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka">KIP-48</a>
Kerberos TGT/keytabs or keystores when 2-way SSL is used. See <ahref="https://cwiki.apache.org/confluence/x/tfmnAw">KIP-48</a>
for more details.</p>
Under the default implementation of <code>principal.builder.class</code>, the owner of delegation token is used as the authenticated <code>Principal</code> for configuration of ACLs etc.
@ -1246,8 +1246,8 @@ sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechani
Kafka ACLs are defined in the general format of "Principal {P} is [Allowed|Denied] Operation {O} From Host {H} on any Resource {R} matching ResourcePattern {RP}".
You can read more about the ACL structure in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface">KIP-11</a> and
resource patterns in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a>.
You can read more about the ACL structure in <ahref="https://cwiki.apache.org/confluence/x/XIUWAw">KIP-11</a> and
resource patterns in <ahref="https://cwiki.apache.org/confluence/x/QpvLB">KIP-290</a>.
In order to add, remove, or list ACLs, you can use the Kafka ACL CLI <code>kafka-acls.sh</code>.
<h5><u>Behavior Without ACLs:</u></h5>
<p>If a resource (R) does not have any ACLs defined, meaning that no ACL matches the resource, Kafka will restrict
and the related upstream documentation at <ahref="#security_delegation_token">Authentication using Delegation Tokens</a>.</li>
<li><b>User:</b> CreateToken and DescribeToken operations can be granted to User resources to allow creating and describing
tokens for other users. More info can be found in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-373%3A+Allow+users+to+create+delegation+tokens+for+other+users">KIP-373</a>.</li>
tokens for other users. More info can be found in <ahref="https://cwiki.apache.org/confluence/x/cwOQBQ">KIP-373</a>.</li>
</ul>
<h5class="anchor-heading"><aid="operations_resources_and_protocols"class="anchor-link"></a><ahref="#operations_resources_and_protocols">Operations and Resources on Protocols</a></h5>
<p>In the below table we'll list the valid operations on resources that are executed by the Kafka API protocols.</p>
Starting more stream threads or more instances of the application merely amounts to replicating the topology and having it process a different subset of Kafka partitions, effectively parallelizing processing.
It is worth noting that there is no shared state amongst the threads, so no inter-thread coordination is necessary. This makes it very simple to run topologies in parallel across the application instances and threads.
The assignment of Kafka topic partitions amongst the various stream threads is transparently handled by Kafka Streams leveraging <ahref="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal">Kafka's coordination</a> functionality.
The assignment of Kafka topic partitions amongst the various stream threads is transparently handled by Kafka Streams leveraging <ahref="https://cwiki.apache.org/confluence/x/foynAw">Kafka's coordination</a> functionality.
More specifically, it guarantees that for any record read from the source Kafka topics, its processing results will be reflected exactly once in the output Kafka topic as well as in the state stores for stateful operations.
Note the key difference between Kafka Streams end-to-end exactly-once guarantee with other stream processing frameworks' claimed guarantees is that Kafka Streams tightly integrates with the underlying Kafka storage system and ensure that
commits on the input topic offsets, updates on the state stores, and writes to the output topics will be completed atomically instead of treating Kafka as an external system that may have side-effects.
For more information on how this is done inside Kafka Streams, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics">KIP-129</a>.<br/>
For more information on how this is done inside Kafka Streams, see <ahref="https://cwiki.apache.org/confluence/x/0okYB">KIP-129</a>.<br/>
As of the 2.6.0 release, Kafka Streams supports an improved implementation of exactly-once processing, named "exactly-once v2",
which requires broker version 2.5.0 or newer.
@ -298,7 +298,7 @@
As of the 3.0.0 release, the first version of exactly-once has been deprecated. Users are encouraged to use exactly-once v2 for
exactly-once processing from now on, and prepare by upgrading their brokers if necessary.
For more information on how this is done inside the brokers and Kafka Streams, see
This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer
client since
<aclass="reference external"href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka version 0.10</a>.
<aclass="reference external"href="https://cwiki.apache.org/confluence/x/eaSnAw">Kafka version 0.10</a>.
Depending on the setting of Kafka’s server-side <codeclass="docutils literal"><spanclass="pre">log.message.timestamp.type</span></code> broker and <codeclass="docutils literal"><spanclass="pre">message.timestamp.type</span></code> topic parameters,
dynamically during application runtime without any downtime or data loss. This makes your applications
resilient in the face of failures and for allows you to perform maintenance as needed (e.g. rolling upgrades).</p>
<p>For more information about this elasticity, see the <aclass="reference internal"href="../architecture.html#streams_architecture_tasks"><spanclass="std std-ref">Parallelism Model</span></a> section. Kafka Streams
leverages the Kafka group management functionality, which is built right into the <aclass="reference external"href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">Kafka wire protocol</a>. It is the foundation that enables the
leverages the Kafka group management functionality, which is built right into the <aclass="reference external"href="https://cwiki.apache.org/confluence/x/uxvVAQ">Kafka wire protocol</a>. It is the foundation that enables the
elasticity of Kafka Streams applications: members of a group coordinate and collaborate jointly on the consumption and
processing of data in Kafka. Additionally, Kafka Streams provides stateful processing and allows for fault-tolerant
state in environments where application instances may come and go at any time.</p>
it is recommended to use ACLs on prefixed resource pattern
to configure control lists to allow client to manage all topics and consumer groups started with this prefix
as <codeclass="docutils literal"><spanclass="pre">--resource-pattern-type prefixed --topic your.application.id --operation All </span></code>
(see <aclass="reference external"href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a>
and <aclass="reference external"href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a> for details).
(see <aclass="reference external"href="https://cwiki.apache.org/confluence/x/zlOHB">KIP-277</a>
and <aclass="reference external"href="https://cwiki.apache.org/confluence/x/QpvLB">KIP-290</a> for details).
can choose whether or not to reuse the source topic based on the <code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code>: if you are upgrading from the old <code>KStreamBuilder</code> class and hence you need to change your code to use
the new <code>StreamsBuilder</code>, you should set this config value to <code>StreamsConfig#OPTIMIZE</code> to continue reusing the source topic; if you are upgrading from 1.0 or 1.1 where you are already using <code>StreamsBuilder</code> and hence have already
created a separate changelog topic, you should set this config value to <code>StreamsConfig#NO_OPTIMIZATION</code> when upgrading to {{fullDotVersion}} in order to use that changelog topic for restoring the state store.
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code> can be found in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code> can be found in <ahref="https://cwiki.apache.org/confluence/x/V53LB">KIP-295</a>.
</p>
<h3><aid="streams_api_changes_410"href="#streams_api_changes_410">Streams API changes in 4.1.0</a></h3>
<p>
The introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1111:+Enforcing+Explicit+Naming+for+Kafka+Streams+Internal+Topics">KIP-1111</a>
The introduction of <ahref="https://cwiki.apache.org/confluence/x/4Y_MEw">KIP-1111</a>
enables you to enforce explicit naming for all internal resources of the topology, including internal topics (e.g., changelog and repartition topics) and their associated state stores.
This ensures that every internal resource is named before the Kafka Streams application is deployed, which is essential for upgrading your topology.
You can enable this feature via <code>StreamsConfig</code> using the <code>StreamsConfig#ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG</code> parameter.
@ -167,59 +167,59 @@
<p>
In this release the <code>ClientInstanceIds</code> instance stores the global consumer<code>Uuid</code> for the
id with a key of global stream-thread name appended with <code>"-global-consumer"</code> where before it was only the global stream-thread name.
</p>
<p>
In this release two configs <code>default.deserialization.exception.handler</code> and <code>default.production.exception.handler</code> are deprecated, as they don't have any overwrites, which is described in
follow this path by deprecating <code>MockProcessorContext</code>, <code>Transformer</code>,
<code>TransformerSupplier</code>, <code>ValueTransformer</code>, and <code>ValueTransformerSupplier</code>.
</p>
<p>
Previously, the <code>ProductionExceptionHandler</code> was not invoked on a (retriable) <code>TimeoutException</code>. With Kafka Streams 4.0, the handler is called, and the default handler would return <code>RETRY</code> to not change existing behavior.
However, a custom handler can now decide to break the infinite retry loop by returning either <code>CONTINUE</code> or <code>FAIL</code> (<ahref="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311627309">KIP-1065</a>).
However, a custom handler can now decide to break the infinite retry loop by returning either <code>CONTINUE</code> or <code>FAIL</code> (<ahref="https://cwiki.apache.org/confluence/x/LQ6TEg">KIP-1065</a>).
</p>
<p>
In this release, Kafka Streams metrics can be collected broker side via the KIP-714 broker-plugin.
For more detailed information, refer to <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1076%3A++Metrics+for+client+applications+KIP-714+extension">KIP-1076</a> document please.
For more detailed information, refer to <ahref="https://cwiki.apache.org/confluence/x/XA-OEg">KIP-1076</a> document please.
deprecates the <code>ForeachProcessor</code> class.
This change is aimed at improving the organization and clarity of the Kafka Streams API by ensuring that internal classes are not exposed in public packages.
</p>
<p>
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1078%3A+Remove+Leaking+Getter+Methods+in+Joined+Helper+Class">KIP-1078</a> deprecates the leaking getter methods in the <code>Joined</code> helper class.
<ahref="https://cwiki.apache.org/confluence/x/hg-OEg">KIP-1078</a> deprecates the leaking getter methods in the <code>Joined</code> helper class.
These methods are deprecated without a replacement for future removal, as they don't add any value to Kafka Streams users.
</p>
<p>
To ensures better encapsulation and organization of configuration documentation within Kafka Streams,
deprecate certain public doc description variables that are only used within the <code>StreamsConfig</code> or <code>TopologyConfig</code> classes.
Additionally, the unused variable <code>DUMMY_THREAD_INDEX</code> will also be deprecated.
</p>
<p>
Due to the removal of the already deprecated <code>#through</code> method in Kafka Streams, the <code>intermediateTopicsOption</code> of <code>StreamsResetter</code> tool in Apache Kafka is
not needed any more and therefore is deprecated (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1087%3A+Removing+intermediateTopicsOption+from+StreamsResetter">KIP-1087</a>).
not needed any more and therefore is deprecated (<ahref="https://cwiki.apache.org/confluence/x/Vo39Eg">KIP-1087</a>).
</p>
<p>
Since string metrics cannot be collected on the broker side (KIP-714), <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1091%3A+Improved+Kafka+Streams+operator+metrics">KIP-1091</a>
Since string metrics cannot be collected on the broker side (KIP-714), <ahref="https://cwiki.apache.org/confluence/x/IgstEw">KIP-1091</a>
introduces numeric counterparts to allow proper broker-side metric collection for Kafka Streams applications.
These metrics will be available at the <code>INFO</code> recording level, and a thread-level metric with a String value will be available for users leveraging Java Management Extensions (<code>JMX</code>).
</p>
@ -234,7 +234,7 @@
</p>
<p>
With introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients">KIP-1106</a>,
With introduction of <ahref="https://cwiki.apache.org/confluence/x/NIyMEw">KIP-1106</a>,
the existing <code>Topology.AutoOffsetReset</code> is deprecated and replaced with a new class <code>org.apache.kafka.streams.AutoOffsetReset</code> to capture the reset strategies.
New methods will be added to the <code>org.apache.kafka.streams.Topology</code> and <code>org.apache.kafka.streams.kstream.Consumed</code> classes to support the new reset strategy.
These changes aim to provide more flexibility and efficiency in managing offsets, especially in scenarios involving long-term storage and infinite retention.
@ -265,7 +265,7 @@
<h3><aid="streams_api_changes_390"href="#streams_api_changes_390">Streams API changes in 3.9.0</a></h3>
<p>
The introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occurring+during+processing">KIP-1033</a>
The introduction of <ahref="https://cwiki.apache.org/confluence/x/xQniEQ">KIP-1033</a>
enables you to provide a processing exception handler to manage exceptions during the processing of a record rather than throwing the exception all the way out of your streams application.
You can provide the configs via the <code>StreamsConfig</code> as <code>StreamsConfig#PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG</code>.
The specified handler must implement the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface.
@ -274,7 +274,7 @@
<p>
Kafka Streams now allows to customize the logging interval of stream-thread runtime summary, via the newly added config <code>log.summary.interval.ms</code>.
By default, the summary is logged every 2 minutes. More details can be found in
in which users can provide their customized implementation of the newly added <code>StandbyUpdateListener</code> interface to continuously monitor changes to standby tasks.
</p>
<p>
IQv2 supports <code>RangeQuery</code> that allows to specify unbounded, bounded, or half-open key-ranges, which return data in unordered (byte[]-lexicographical) order (per partition).
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2">KIP-985</a> extends this functionality by adding <code>.withDescendingKeys()</code> and <code>.withAscendingKeys()</code>to allow user to receive data in descending or ascending order.
<ahref="https://cwiki.apache.org/confluence/x/eKCzDw">KIP-985</a> extends this functionality by adding <code>.withDescendingKeys()</code> and <code>.withAscendingKeys()</code>to allow user to receive data in descending or ascending order.
</p>
<p>
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery">KIP-992</a> adds two new query types,
<ahref="https://cwiki.apache.org/confluence/x/TYxEE">KIP-992</a> adds two new query types,
namely <code>TimestampedKeyQuery</code> and <code>TimestampedRangeQuery</code>. Both should be used to query a timestamped key-value store, to retrieve a <code>ValueAndTimestamp</code> result.
The existing <code>KeyQuery</code> and <code>RangeQuery</code> are changed to always return the value only for timestamped key-value stores.
</p>
<p>
IQv2 adds support for <code>MultiVersionedKeyQuery</code> (introduced in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores">KIP-968</a>)
IQv2 adds support for <code>MultiVersionedKeyQuery</code> (introduced in <ahref="https://cwiki.apache.org/confluence/x/WpSzDw">KIP-968</a>)
that allows retrieving a set of records from a versioned state store for a given key and a specified time range.
Users have to use <code>fromTime(Instant)</code> and/or <code>toTime(Instant)</code> to specify a half or a complete time range.
</p>
<p>
IQv2 adds support for <code>VersionedKeyQuery</code> (introduced in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores">KIP-960</a>)
IQv2 adds support for <code>VersionedKeyQuery</code> (introduced in <ahref="https://cwiki.apache.org/confluence/x/qo_zDw">KIP-960</a>)
that allows retrieving a single record from a versioned state store based on its key and timestamp.
Users have to use the <code>asOf(Instant)</code> method to define a query that returns the record's version for the specified timestamp.
To be more precise, the key query returns the record with the greatest timestamp <code><= Instant</code>.
</p>
<p>
The non-null key requirements for Kafka Streams join operators were relaxed as part of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams">KIP-962</a>.
The non-null key requirements for Kafka Streams join operators were relaxed as part of <ahref="https://cwiki.apache.org/confluence/x/f5CzDw">KIP-962</a>.
The behavior of the following operators changed.
<ul>
<li>left join KStream-KStream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.</li>
@ -411,7 +411,7 @@
<h3><aid="streams_api_changes_360"href="#streams_api_changes_360">Streams API changes in 3.6.0</a></h3>
<p>
Rack aware task assignment was introduced in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams">KIP-925</a>.
Rack aware task assignment was introduced in <ahref="https://cwiki.apache.org/confluence/x/CQ40Dw">KIP-925</a>.
Rack aware task assignment can be enabled for <code>StickyTaskAssignor</code> or <code>HighAvailabilityTaskAssignor</code> to compute task assignments which can minimize cross rack traffic under certain conditions.
For more information, including how it can be enabled and further configured, see the <ahref="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka Streams Developer Guide</b></a>.
</p>
@ -419,14 +419,14 @@
<p>
IQv2 supports a <code>RangeQuery</code> that allows to specify unbounded, bounded, or half-open key-ranges. Users have to use <code>withUpperBound(K)</code>, <code>withLowerBound(K)</code>,
or <code>withNoBounds()</code> to specify half-open or unbounded ranges, but cannot use <code>withRange(K lower, K upper)</code> for the same.
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds">KIP-941</a> closes this gap by allowing to pass in <code>null</code>
<ahref="https://cwiki.apache.org/confluence/x/_Rk0Dw">KIP-941</a> closes this gap by allowing to pass in <code>null</code>
as upper and lower bound (with semantics "no bound") to simplify the usage of the <code>RangeQuery</code> class.
</p>
<p>
KStreams-to-KTable joins now have an option for adding a grace period.
The grace period is enabled on the <code>Joined</code> object using with <code>withGracePeriod()</code> method.
This change was introduced in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join">KIP-923</a>.
This change was introduced in <ahref="https://cwiki.apache.org/confluence/x/lAs0Dw">KIP-923</a>.
To use the grace period option in the Stream-Table join the table must be
For more information, including how it can be enabled and further configured, see the <ahref="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka Streams Developer Guide</b></a>.
@ -435,8 +435,8 @@
<h3><aid="streams_api_changes_350"href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3>
<p>
A new state store type, versioned key-value stores, was introduced in
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores">KIP-889</a> and
improves the implementation of KTable aggregations. In general, an input KTable update triggers a result refinent for two rows;
however, prior to KIP-904, if both refinements happen to the same result row, two independent updates to the same row are applied, resulting in spurious itermediate results.
KIP-904 allows us to detect this case, and to only apply a single update avoiding spurious intermediate results.
</p>
<p>
Error handling is improved via <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions">KIP-399</a>.
Error handling is improved via <ahref="https://cwiki.apache.org/confluence/x/R4nQBQ">KIP-399</a>.
The existing <code>ProductionExceptionHandler</code> now also covers serialization errors.
introduces "emit strategies", which are built into the aggregation operator directly to use the already existing
RocksDB store. <code>TimeWindowedKStream.emitStrategy(EmitStrategy)</code> and
<code>SessionWindowedKStream.emitStrategy(EmitStrategy)</code> allow picking between "emit on window update" (default)
@ -567,7 +567,7 @@
<h3><aid="streams_api_changes_320"href="#streams_api_changes_320">Streams API changes in 3.2.0</a></h3>
<p>
RocksDB offers many metrics which are critical to monitor and tune its performance. Kafka Streams started to make RocksDB metrics accessible
like any other Kafka metric via <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams">KIP-471</a> in 2.4.0 release.
like any other Kafka metric via <ahref="https://cwiki.apache.org/confluence/x/A5LiBg">KIP-471</a> in 2.4.0 release.
However, the KIP was only partially implemented, and is now completed with the 3.2.0 release.
For a full list of available RocksDB metrics, please consult the <ahref="/{{version}}/documentation/#kafka_streams_client_monitoring">monitoring documentation</a>.
</p>
@ -576,7 +576,7 @@
Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use.
However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB
store to in-memory store for all operators, especially for larger topologies.
Previously, if an input table was partitioned by a non-default partitioner, joining records might fail.
With KIP-775 you now can pass a custom <code>StreamPartitioner</code> into the join using the newly added
<code>TableJoined</code> object.
@ -682,7 +682,7 @@
<li><code>InvalidStateStorePartitionException</code>: If the specified partition does not exist, a <code>InvalidStateStorePartitionException</code> will be thrown.</li>
</ul>
<p>
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors">KIP-216</a> for more information.
See <ahref="https://cwiki.apache.org/confluence/x/0JpzB">KIP-216</a> for more information.
</p>
<p>
We deprecated the StreamsConfig <code>processing.guarantee</code> configuration value <code>"exactly_once"</code> (for EOS version 1) in favor of the improved EOS version 2, formerly configured via
@ -713,7 +713,7 @@
when playing around with Kafka Streams for the first time. Note that using the new APIs for the
<code>JoinWindows</code> class will also enable a fix for spurious left/outer join results, as described in
the following paragraph. For more details on the grace period and new static constructors, see
Additionally, in older versions Kafka Streams emitted stream-stream left/outer join results eagerly. This behavior may lead to spurious left/outer join result records.
@ -746,22 +746,22 @@
We removed the following deprecated APIs:
</p>
<ul>
<li><code>--zookeeper</code> flag of the application reset tool: deprecated in Kafka 1.0.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-198%3A+Remove+ZK+dependency+from+Streams+Reset+Tool">KIP-198</a>).</li>
<li><code>--execute</code> flag of the application reset tool: deprecated in Kafka 1.1.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application">KIP-171</a>).</li>
<li><code>StreamsBuilder#addGlobalStore</code> (one overload): deprecated in Kafka 1.1.0 (<ahref="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74689212">KIP-233</a>).</li>
<li><code>ProcessorContext#forward</code> (some overloads): deprecated in Kafka 2.0.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).</li>
<li><code>WindowBytesStoreSupplier#segments</code>: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier">KIP-319</a>).</li>
<li><code>segments, until, maintainMs</code> on <code>TimeWindows</code>, <code>JoinWindows</code>, and <code>SessionWindows</code>: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables">KIP-328</a>).</li>
<li> Overloaded <code>JoinWindows#of, before, after</code>, <code>SessionWindows#with</code>, <code>TimeWindows#of, advanceBy</code>, <code>UnlimitedWindows#startOn</code> and <code>KafkaStreams#close</code> with <code>long</code> typed parameters: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>).</li>
<li> Overloaded <code>KStream#groupBy, groupByKey</code> and <code>KTable#groupBy</code> with <code>Serialized</code> parameter: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping">KIP-372</a>).</li>
<li><code>Joined#named, name</code>: deprecated in Kafka 2.3.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL">KIP-307</a>).</li>
<li><code>TopologyTestDriver#pipeInput, readOutput</code>, <code>OutputVerifier</code> and <code>ConsumerRecordFactory</code> classes (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements">KIP-470</a>).</li>
<li><code>KafkaClientSupplier#getAdminClient</code>: deprecated in Kafka 2.4.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-476%3A+Add+Java+AdminClient+Interface">KIP-476</a>).</li>
<li> Overloaded <code>KStream#join, leftJoin, outerJoin</code> with <code>KStream</code> and <code>Joined</code> parameters: deprecated in Kafka 2.4.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join">KIP-479</a>).</li>
<li><code>WindowStore#put(K key, V value)</code>: deprecated in Kafka 2.4.0 (<ahref="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545">KIP-474</a>).</li>
<li><code>UsePreviousTimeOnInvalidTimestamp</code>: deprecated in Kafka 2.5.0 as renamed to <code>UsePartitionTimeOnInvalidTimestamp</code> (<ahref="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=130028807">KIP-530</a>).</li>
<li> Overloaded <code>KafkaStreams#metadataForKey</code>: deprecated in Kafka 2.5.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance">KIP-535</a>).</li>
<li> Overloaded <code>KafkaStreams#store</code>: deprecated in Kafka 2.5.0 (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance">KIP-562</a>).</li>
<li><code>--zookeeper</code> flag of the application reset tool: deprecated in Kafka 1.0.0 (<ahref="https://cwiki.apache.org/confluence/x/6J1jB">KIP-198</a>).</li>
<li><code>--execute</code> flag of the application reset tool: deprecated in Kafka 1.1.0 (<ahref="https://cwiki.apache.org/confluence/x/ApI7B">KIP-171</a>).</li>
<li><code>StreamsBuilder#addGlobalStore</code> (one overload): deprecated in Kafka 1.1.0 (<ahref="https://cwiki.apache.org/confluence/x/vKpzB">KIP-233</a>).</li>
<li><code>ProcessorContext#forward</code> (some overloads): deprecated in Kafka 2.0.0 (<ahref="https://cwiki.apache.org/confluence/x/Ih6HB">KIP-251</a>).</li>
<li><code>WindowBytesStoreSupplier#segments</code>: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/x/mQU0BQ">KIP-319</a>).</li>
<li><code>segments, until, maintainMs</code> on <code>TimeWindows</code>, <code>JoinWindows</code>, and <code>SessionWindows</code>: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/x/sQU0BQ">KIP-328</a>).</li>
<li> Overloaded <code>JoinWindows#of, before, after</code>, <code>SessionWindows#with</code>, <code>TimeWindows#of, advanceBy</code>, <code>UnlimitedWindows#startOn</code> and <code>KafkaStreams#close</code> with <code>long</code> typed parameters: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/x/IBNPBQ">KIP-358</a>).</li>
<li> Overloaded <code>KStream#groupBy, groupByKey</code> and <code>KTable#groupBy</code> with <code>Serialized</code> parameter: deprecated in Kafka 2.1.0 (<ahref="https://cwiki.apache.org/confluence/x/mgJ1BQ">KIP-372</a>).</li>
<li><code>Joined#named, name</code>: deprecated in Kafka 2.3.0 (<ahref="https://cwiki.apache.org/confluence/x/xikYBQ">KIP-307</a>).</li>
<li><code>TopologyTestDriver#pipeInput, readOutput</code>, <code>OutputVerifier</code> and <code>ConsumerRecordFactory</code> classes (<ahref="https://cwiki.apache.org/confluence/x/tI-iBg">KIP-470</a>).</li>
<li><code>KafkaClientSupplier#getAdminClient</code>: deprecated in Kafka 2.4.0 (<ahref="https://cwiki.apache.org/confluence/x/V9XiBg">KIP-476</a>).</li>
<li> Overloaded <code>KStream#join, leftJoin, outerJoin</code> with <code>KStream</code> and <code>Joined</code> parameters: deprecated in Kafka 2.4.0 (<ahref="https://cwiki.apache.org/confluence/x/EBEgBw">KIP-479</a>).</li>
<li><code>WindowStore#put(K key, V value)</code>: deprecated in Kafka 2.4.0 (<ahref="https://cwiki.apache.org/confluence/x/kcviBg">KIP-474</a>).</li>
<li><code>UsePreviousTimeOnInvalidTimestamp</code>: deprecated in Kafka 2.5.0 as renamed to <code>UsePartitionTimeOnInvalidTimestamp</code> (<ahref="https://cwiki.apache.org/confluence/x/BxXABw">KIP-530</a>).</li>
<li> Overloaded <code>KafkaStreams#metadataForKey</code>: deprecated in Kafka 2.5.0 (<ahref="https://cwiki.apache.org/confluence/x/Xg-jBw">KIP-535</a>).</li>
<li> Overloaded <code>KafkaStreams#store</code>: deprecated in Kafka 2.5.0 (<ahref="https://cwiki.apache.org/confluence/x/QYyvC">KIP-562</a>).</li>
</ul>
<p>
The following dependencies were removed from Kafka Streams:
@ -784,7 +784,7 @@
<h3><aid="streams_api_changes_280"href="#streams_api_changes_280">Streams API changes in 2.8.0</a></h3>
<p>
We extended <code>StreamJoined</code> to include the options <code>withLoggingEnabled()</code> and <code>withLoggingDisabled()</code> in
We added two new methods to <code>KafkaStreams</code>, namely <code>KafkaStreams#addStreamThread()</code> and <code>KafkaStreams#removeStreamThread()</code> in
@ -813,19 +813,19 @@
The <code>TimeWindowedDeserializer</code> constructor <code>TimeWindowedDeserializer(final Deserializer inner)</code>
was deprecated to encourage users to properly set their window size through <code>TimeWindowedDeserializer(final Deserializer inner, Long windowSize)</code>.
An additional streams config, <code>window.size.ms</code>, was added for users that cannot set the window size through
the constructor, such as when using the console consumer. <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size">KIP-659</a>
the constructor, such as when using the console consumer. <ahref="https://cwiki.apache.org/confluence/x/aDR4CQ">KIP-659</a>
has more details.
</p>
<p>
To simplify testing, two new constructors that don't require a <code>Properties</code> parameter have been
added to the <code>TopologyTestDriver</code> class. If <code>Properties</code> are passed
into the constructor, it is no longer required to set mandatory configuration parameters
We changed the default value of <code>default.key.serde</code> and <code>default.value.serde</code> to be <code>null</code> instead of <code>ByteArraySerde</code>.
Users will now see a <code>ConfigException</code> if their serdes are not correctly configured through those configs or passed in explicitly.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-741%3A+Change+default+serde+to+be+null">KIP-741</a> for more details.
See <ahref="https://cwiki.apache.org/confluence/x/bIbOCg">KIP-741</a> for more details.
</p>
<h3><aid="streams_api_changes_270"href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3>
@ -849,13 +849,13 @@
<code>KeyQueryMetadata</code> was introduced in Kafka Streams 2.5 release with getter methods having prefix <code>get</code>.
The intend of this change is to bring the method names to Kafka custom to not use the <code>get</code> prefix for getter methods.
The old methods are deprecated and is not effected.
You can enable this new feature by setting the configuration parameter <code>processing.guarantee</code> to the
new value <code>"exactly_once_beta"</code>.
Note that you need brokers with version 2.5 or newer to use this feature.
@ -896,7 +896,7 @@
</p>
<p>
As of 2.6.0 Kafka Streams deprecates <code>KStream.through()</code> in favor of the new <code>KStream.repartition()</code> operator
(as per <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint">KIP-221</a>).
(as per <ahref="https://cwiki.apache.org/confluence/x/i55zB">KIP-221</a>).
<code>KStream.repartition()</code> is similar to <code>KStream.through()</code>, however Kafka Streams will manage the topic for you.
If you need to write into and read back from a topic that you manage, you can fall back to use <code>KStream.to()</code> in combination with <code>StreamsBuilder#stream()</code>.
Please refer to the <ahref="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details about <code>KStream.repartition()</code>.
@ -910,50 +910,50 @@
</p>
<p>
We added a <code>--force</code> option in StreamsResetter to force remove left-over members on broker side when long session time out was configured
as per <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-571%3A+Add+option+to+force+remove+members+in+StreamsResetter">KIP-571</a>.
as per <ahref="https://cwiki.apache.org/confluence/x/8I7JC">KIP-571</a>.
</p>
<p>
We added <code>Suppressed.withLoggingDisabled()</code> and <code>Suppressed.withLoggingEnabled(config)</code>
methods to allow disabling or configuring of the changelog topic and allows for configuration of the changelog topic
as per <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress">KIP-446</a>.
as per <ahref="https://cwiki.apache.org/confluence/x/RBiGBg">KIP-446</a>.
</p>
<h3class="anchor-heading"><aid="streams_api_changes_250"class="anchor-link"></a><ahref="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>
<p>
We add a new <code>cogroup()</code> operator (via <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup">KIP-150</a>)
We add a new <code>cogroup()</code> operator (via <ahref="https://cwiki.apache.org/confluence/x/YxcjB">KIP-150</a>)
that allows to aggregate multiple streams in a single operation.
Cogrouped streams can also be windowed before they are aggregated.
Please refer to the <ahref="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details.
</p>
<p>
We added a new <code>KStream.toTable()</code> API to translate an input event stream into a changelog stream as per
We added a new Serde type <code>Void</code> in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-527%3A+Add+VoidSerde+to+Serdes">KIP-527</a> to represent
We added a new Serde type <code>Void</code> in <ahref="https://cwiki.apache.org/confluence/x/3QvABw">KIP-527</a> to represent
null keys or null values from input topic.
</p>
<p>
Deprecated <code>UsePreviousTimeOnInvalidTimestamp</code> and replaced it with <code>UsePartitionTimeOnInvalidTimeStamp</code> as per
Deprecated <code>KafkaStreams.store(String, QueryableStoreType)</code> and replaced it with <code>KafkaStreams.store(StoreQueryParameters)</code> to allow querying
for a store with variety of parameters, including querying a specific task and stale stores, as per
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance">KIP-562</a> and
<h3class="anchor-heading"><aid="streams_api_changes_240"class="anchor-link"></a><ahref="#streams_api_changes_240">Streams API changes in 2.4.0</a></h3>
<p>
As of 2.4.0 Kafka Streams offers a KTable-KTable foreign-key join (as per <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable">KIP-213</a>).
As of 2.4.0 Kafka Streams offers a KTable-KTable foreign-key join (as per <ahref="https://cwiki.apache.org/confluence/x/pJlzB">KIP-213</a>).
This joiner allows for records to be joined between two KTables with different keys.
Both <ahref="/{{version}}/documentation/streams/developer-guide/dsl-api.html#ktable-ktable-fk-join">INNER and LEFT foreign-key joins</a>
are supported.
</p>
<p>
In the 2.4 release, you now can name all operators in a Kafka Streams DSL topology via
With the introduction of incremental cooperative rebalancing, Streams no longer requires all tasks be revoked at the beginning of a rebalance. Instead, at the completion of the rebalance only those tasks which are to be migrated to another consumer
for overall load balance will need to be closed and revoked. This changes the semantics of the <code>StateListener</code> a bit, as it will not necessarily transition to <code>REBALANCING</code> at the beginning of a rebalance anymore. Note that
this means IQ will now be available at all times except during state restoration, including while a rebalance is in progress. If restoration is occurring when a rebalance begins, we will continue to actively restore the state stores and/or process
standby tasks during a cooperative rebalance. Note that with this new rebalancing protocol, you may sometimes see a rebalance be followed by a second short rebalance that ensures all tasks are safely distributed. For details on please see
and will be removed in the next major release (<ahref="https://issues.apache.org/jira/browse/KAFKA-7785">KAFKA-7785</a>.
Hence, this feature won't be supported in the future any longer and you need to updated your code accordingly.
If you use a custom <code>PartitionGrouper</code> and stop to use it, the created tasks might change.
@ -1040,8 +1040,8 @@
<p>Version 2.3.0 adds the Suppress operator to the <code>kafka-streams-scala</code> Ktable API.</p>
<p>
As of 2.3.0 Streams now offers an in-memory version of the window (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-428%3A+Add+in-memory+window+store">KIP-428</a>)
and the session (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-445%3A+In-memory+Session+Store">KIP-445</a>) store, in addition to the persistent ones based on RocksDB.
As of 2.3.0 Streams now offers an in-memory version of the window (<ahref="https://cwiki.apache.org/confluence/x/6AQlBg">KIP-428</a>)
and the session (<ahref="https://cwiki.apache.org/confluence/x/DiqGBg">KIP-445</a>) store, in addition to the persistent ones based on RocksDB.
The new public interfaces <code>inMemoryWindowStore()</code> and <code>inMemorySessionStore()</code> are added to <code>Stores</code> and provide the built-in in-memory window or session store.
</p>
@ -1056,7 +1056,7 @@
<p>
In 2.3.0 we have added default implementation to <code>close()</code> and <code>configure()</code> for <code>Serializer</code>,
<code>Deserializer</code> and <code>Serde</code> so that they can be implemented by lambda expression.
For more details please read <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-331+Add+default+implementation+to+close%28%29+and+configure%28%29+for+Serializer%2C+Deserializer+and+Serde">KIP-331</a>.
For more details please read <ahref="https://cwiki.apache.org/confluence/x/fgw0BQ">KIP-331</a>.
</p>
<p>
@ -1070,18 +1070,18 @@
<code>java.lang.ClassCastException: class org.apache.kafka.streams.state.ValueAndTimestamp cannot be cast to class YOUR-VALUE-TYPE</code>
upon getting a value from the store.</strong>
Additionally, <code>TopologyTestDriver#getStateStore()</code> only returns non-built-in stores and throws an exception if a built-in store is accessed.
For more details please read <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB">KIP-258</a>.
For more details please read <ahref="https://cwiki.apache.org/confluence/x/0j6HB">KIP-258</a>.
</p>
<p>
To improve type safety, a new operator <code>KStream#flatTransformValues</code> is added.
For more details please read <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues">KIP-313</a>.
For more details please read <ahref="https://cwiki.apache.org/confluence/x/bUgYBQ">KIP-313</a>.
</p>
<p>
Kafka Streams used to set the configuration parameter <code>max.poll.interval.ms</code> to <code>Integer.MAX_VALUE</code>.
This default value is removed and Kafka Streams uses the consumer default value now.
For more details please read <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-442%3A+Return+to+default+max+poll+interval+in+Streams">KIP-442</a>.
For more details please read <ahref="https://cwiki.apache.org/confluence/x/1COGBg">KIP-442</a>.
</p>
<p>
@ -1089,13 +1089,13 @@
The segment size for index files (<code>segment.index.bytes</code>) is no longer 50MB, but uses the cluster default.
Similarly, the configuration <code>segment.ms</code> in no longer 10 minutes, but uses the cluster default configuration.
Lastly, the retention period (<code>retention.ms</code>) is changed from <code>Long.MAX_VALUE</code> to <code>-1</code> (infinite).
For more details please read <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-443%3A+Return+to+default+segment.ms+and+segment.index.bytes+in+Streams+repartition+topics">KIP-443</a>.
For more details please read <ahref="https://cwiki.apache.org/confluence/x/4iOGBg">KIP-443</a>.
</p>
<p>
To avoid memory leaks, <code>RocksDBConfigSetter</code> has a new <code>close()</code> method that is called on shutdown.
Users should implement this method to release any memory used by RocksDB config objects, by closing those objects.
For more details please read <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter">KIP-453</a>.
For more details please read <ahref="https://cwiki.apache.org/confluence/x/QhaZBg">KIP-453</a>.
</p>
<p>
@ -1113,12 +1113,12 @@
<p>
In <code>WindowedSerdes</code>, we've added a new static constructor to return a <code>TimeWindowSerde</code> with configurable window size. This is to help users to construct time window serdes to read directly from a time-windowed store's changelog.
More details can be found in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-393%3A+Time+windowed+serde+to+properly+deserialize+changelog+input+topic">KIP-393</a>.
More details can be found in <ahref="https://cwiki.apache.org/confluence/x/WYTQBQ">KIP-393</a>.
</p>
<p>
In 2.2.0 we have extended a few public interfaces including <code>KafkaStreams</code> to extend <code>AutoCloseable</code> so that they can be
used in a try-with-resource statement. For a full list of public interfaces that get impacted please read <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-376%3A+Implement+AutoClosable+on+appropriate+classes+that+want+to+be+used+in+a+try-with-resource+statement">KIP-376</a>.
used in a try-with-resource statement. For a full list of public interfaces that get impacted please read <ahref="https://cwiki.apache.org/confluence/x/-AeQBQ">KIP-376</a>.
</p>
<h3class="anchor-heading"><aid="streams_api_changes_210"class="anchor-link"></a><ahref="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
@ -1127,7 +1127,7 @@
Users are encouraged to use <code>#topicSet()</code> and <code>#topicPattern()</code> accordingly on <code>TopologyDescription.Source</code> nodes,
instead of using <code>#topics()</code>, which has since been deprecated. Similarly, use <code>#topic()</code> and <code>#topicNameExtractor()</code>
to get descriptions of <code>TopologyDescription.Sink</code> nodes. For more details, see
Additionally, we've updated the <code>Joined</code> class with a new method <code>Joined#withName</code>
enabling users to name any repartition topics required for performing Stream/Stream or Stream/Table join. For more details repartition
topic naming, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping">KIP-372</a>.
topic naming, see <ahref="https://cwiki.apache.org/confluence/x/mgJ1BQ">KIP-372</a>.
As a result we've updated the Kafka Streams Scala API and removed the <code>Serialized</code> class in favor of adding <code>Grouped</code>.
If you just rely on the implicit <code>Serialized</code>, you just need to recompile; if you pass in <code>Serialized</code> explicitly, sorry you'll have to make code changes.
@ -1161,7 +1161,7 @@
<p>
We added a new serde for UUIDs (<code>Serdes.UUIDSerde</code>) that you can use via <code>Serdes.UUID()</code>
The <code>Window</code> class has new methods <code>#startTime()</code> and <code>#endTime()</code> that return window start/end timestamp as <code>Instant</code>.
For interactive queries, there are new <code>#fetch(...)</code> overloads taking <code>Instant</code> arguments.
Additionally, punctuations are now registered via <code>ProcessorContext#schedule(Duration interval, ...)</code>.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>.
For more details, see <ahref="https://cwiki.apache.org/confluence/x/IBNPBQ">KIP-358</a>.
</p>
<p>
We deprecated <code>KafkaStreams#close(...)</code> and replaced it with <code>KafkaStreams#close(Duration)</code> that accepts a single timeout argument
Note: the new <code>#close(Duration)</code> method has improved (but slightly different) semantics.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>.
For more details, see <ahref="https://cwiki.apache.org/confluence/x/IBNPBQ">KIP-358</a>.
</p>
<p>
The newly exposed <code>AdminClient</code> metrics are now available when calling the <code>KafkaStream#metrics()</code> method.
For more details on exposing <code>AdminClients</code> metrics
see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-324%3A+Add+method+to+get+metrics%28%29+in+AdminClient">KIP-324</a>
see <ahref="https://cwiki.apache.org/confluence/x/lQg0BQ">KIP-324</a>
</p>
<p>
@ -1197,9 +1197,9 @@
Similarly, <code>WindowBytesStoreSupplier#segments()</code> was deprecated and replaced with <code>WindowBytesStoreSupplier#segmentInterval()</code>.
If you implement custom window store, you need to update your code accordingly.
Finally, <code>Stores#persistentWindowStore(...)</code> were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier">KIP-319</a>
(note: <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables">KIP-328</a> and
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a> 'overlap' with KIP-319).
For more details, see <ahref="https://cwiki.apache.org/confluence/x/mQU0BQ">KIP-319</a>
(note: <ahref="https://cwiki.apache.org/confluence/x/sQU0BQ">KIP-328</a> and
<ahref="https://cwiki.apache.org/confluence/x/IBNPBQ">KIP-358</a> 'overlap' with KIP-319).
</p>
<p>
@ -1211,7 +1211,7 @@
reusing the source topic as the changelog topic, the topology may be optimized to merge redundant repartition topics into one
repartition topic. The original no parameter version of <code>StreamsBuilder#build</code> is still available for those who wish to not
optimize their topology. Note that enabling optimization of the topology may require you to do an application reset when redeploying the application. For more
details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-312%3A+Add+Overloaded+StreamsBuilder+Build+Method+to+Accept+java.util.Properties">KIP-312</a>
details, see <ahref="https://cwiki.apache.org/confluence/x/CkcYBQ">KIP-312</a>
</p>
<p>
@ -1249,7 +1249,7 @@
<code>skipped-records-rate</code> and <code>skipped-records-total</code>. When a record is skipped, the event is
now logged at WARN level. If these warnings become burdensome, we recommend explicitly filtering out unprocessable
records instead of depending on record skipping semantics. For more details, see
As of right now, the potential causes of skipped records are:
</p>
<ul>
@ -1287,13 +1287,13 @@
<p>
We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying a single window's key-value pair.
For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-261%3A+Add+Single+Value+Fetch+in+Window+Stores">KIP-261</a>.
For more details, see <ahref="https://cwiki.apache.org/confluence/x/UUSHB">KIP-261</a>.
</p>
<p>
We have added public <code>WindowedSerdes</code> to allow users to read from / write to a topic storing windowed table changelogs directly.
In addition, in <code>StreamsConfig</code> we have also added <code>default.windowed.key.serde.inner</code> and <code>default.windowed.value.serde.inner</code>
to let users specify inner serdes if the default serde classes are windowed serdes.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs">KIP-265</a>.
For more details, see <ahref="https://cwiki.apache.org/confluence/x/_keHB">KIP-265</a>.
</p>
<p>
We've added message header support in the <code>Processor API</code> in Kafka 2.0.0. In particular, we have added a new API <code>ProcessorContext#headers()</code>
@ -1304,10 +1304,10 @@
<p>
We have deprecated constructors of <code>KafkaStreams</code> that take a <code>StreamsConfig</code> as parameter.
Please use the other corresponding constructors that accept <code>java.util.Properties</code> instead.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor">KIP-245</a>.
For more details, see <ahref="https://cwiki.apache.org/confluence/x/KLRzB">KIP-245</a>.
</p>
<p>
Kafka 2.0.0 allows to manipulate timestamps of output records using the Processor API (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
Kafka 2.0.0 allows to manipulate timestamps of output records using the Processor API (<ahref="https://cwiki.apache.org/confluence/x/Ih6HB">KIP-251</a>).
To enable this new feature, <code>ProcessorContext#forward(...)</code> was modified.
The two existing overloads <code>#forward(Object key, Object value, String childName)</code> and <code>#forward(Object key, Object value, int childIndex)</code> were deprecated and a new overload <code>#forward(Object key, Object value, To to)</code> was added.
The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record.
@ -1334,7 +1334,7 @@
automatic conversion between Java and Scala collection types, a way
to implicitly provide Serdes to reduce boilerplate from your application and make it more typesafe, and more! For more information see the
<ahref="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">Kafka Streams DSL for Scala documentation</a> and
We have added support for methods in <code>ReadOnlyWindowStore</code> which allows for querying <code>WindowStore</code>s without the necessity of providing keys.
For users who have customized window store implementations on the above interface, they'd need to update their code to implement the newly added method as well.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-205%3A+Add+all%28%29+and+range%28%29+API+to+ReadOnlyWindowStore">KIP-205</a>.
For more details, see <ahref="https://cwiki.apache.org/confluence/x/6qdjB">KIP-205</a>.
</p>
<p>
There is a new artifact <code>kafka-streams-test-utils</code> providing a <code>TopologyTestDriver</code>, <code>ConsumerRecordFactory</code>, and <code>OutputVerifier</code> class.
You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application.
For more details, see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams">KIP-247</a>.
For more details, see <ahref="https://cwiki.apache.org/confluence/x/EQOHB">KIP-247</a>.
</p>
<p>
The introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier">KIP-220</a>
The introduction of <ahref="https://cwiki.apache.org/confluence/x/QJ5zB">KIP-220</a>
enables you to provide configuration parameters for the embedded admin client created by Kafka Streams, similar to the embedded producer and consumer clients.
You can provide the configs via <code>StreamsConfig</code> by adding the configs with the prefix <code>admin.</code> as defined by <code>StreamsConfig#adminClientPrefix(String)</code>
to distinguish them from configurations of other clients that share the same config names.
@ -1412,7 +1412,7 @@
<p> Changes in <code>StreamsResetter</code>: </p>
<ul>
<li> added options to specify input topics offsets to reset according to <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application">KIP-171</a></li>
<li> added options to specify input topics offsets to reset according to <ahref="https://cwiki.apache.org/confluence/x/ApI7B">KIP-171</a></li>
</ul>
<h3class="anchor-heading"><aid="streams_api_changes_100"class="anchor-link"></a><ahref="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3>
@ -1438,8 +1438,8 @@
and <code>TopologyBuilder</code> but not part of the actual API are not present
in the new classes any longer.
Furthermore, some overloads were simplified compared to the original classes.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API">KIP-120</a>
and <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
See <ahref="https://cwiki.apache.org/confluence/x/uR8IB">KIP-120</a>
and <ahref="https://cwiki.apache.org/confluence/x/TYZjB">KIP-182</a>
for full details.
</p>
@ -1459,7 +1459,7 @@
New methods in <code>KStream</code>:
</p>
<ul>
<li>With the introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-202+Move+merge%28%29+from+StreamsBuilder+to+KStream">KIP-202</a>
<li>With the introduction of <ahref="https://cwiki.apache.org/confluence/x/66JjB">KIP-202</a>
a new method <code>merge()</code> has been created in <code>KStream</code> as the StreamsBuilder class's <code>StreamsBuilder#merge()</code> has been removed.
The method signature was also changed, too: instead of providing multiple <code>KStream</code>s into the method at the once, only a single <code>KStream</code> is accepted.
With the introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines">KIP-182</a>
With the introduction of <ahref="https://cwiki.apache.org/confluence/x/TYZjB">KIP-182</a>
you should no longer pass in <code>Serde</code> to <code>KStream#print</code> operations.
If you can't rely on using <code>toString</code> to print your keys an values, you should instead you provide a custom <code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code> call.
</li>
@ -1533,14 +1533,14 @@
</p>
<p>
The introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers">KIP-161</a>
The introduction of <ahref="https://cwiki.apache.org/confluence/x/WQgwB">KIP-161</a>
enables you to provide a default exception handler for deserialization errors when reading data from Kafka rather than throwing the exception all the way out of your streams application.
You can provide the configs via the <code>StreamsConfig</code> as <code>StreamsConfig#DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG</code>.
The specified handler must implement the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.
</p>
<p>
The introduction of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+topic+configs">KIP-173</a>
The introduction of <ahref="https://cwiki.apache.org/confluence/x/aZM7B">KIP-173</a>
enables you to provide topic configuration parameters for any topics created by Kafka Streams.
This includes repartition and changelog topics.
You can provide the configs via the <code>StreamsConfig</code> by adding the configs with the prefix as defined by <code>StreamsConfig#topicPrefix(String)</code>.
@ -1645,7 +1645,7 @@
</p>
<ul>
<li> parameter <code>zookeeper.connect</code> was deprecated; a Kafka Streams application does no longer interact with ZooKeeper for topic management but uses the new broker admin protocol
<li> added many new parameters for metrics, security, and client configurations </li>
</ul>
@ -1673,7 +1673,7 @@
<li> added overloads for <code>join()</code> to join with <code>KTable</code></li>
<li> added overloads for <code>join()</code> and <code>leftJoin()</code> to join with <code>GlobalKTable</code></li>
<li> note, join semantics in 0.10.2 were improved and thus you might see different result compared to 0.10.0.x and 0.10.1.x
(cf. <ahref="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">Kafka Streams Join Semantics</a> in the Apache Kafka wiki)
(cf. <ahref="https://cwiki.apache.org/confluence/x/EzPtAw">Kafka Streams Join Semantics</a> in the Apache Kafka wiki)
</ul>
<p> Aligned <code>null</code>-key handling for <code>KTable</code> joins: </p>
@ -1696,7 +1696,7 @@
<li> new alternative timestamp extractor classes <code>LogAndSkipOnInvalidTimestamp</code> and <code>UsePreviousTimeOnInvalidTimestamps</code></li>
</ul>
<p> Relaxed type constraints of many DSL interfaces, classes, and methods (cf. <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-100+-+Relax+Type+constraints+in+Kafka+Streams+API">KIP-100</a>). </p>
<p> Relaxed type constraints of many DSL interfaces, classes, and methods (cf. <ahref="https://cwiki.apache.org/confluence/x/dQMIB">KIP-100</a>). </p>
<h3class="anchor-heading"><aid="streams_api_changes_0101"class="anchor-link"></a><ahref="#streams_api_changes_0101">Streams API changes in 0.10.1.0</a></h3>
(including Connect and Kafka Streams which use the clients internally) to 4.0.
Similarly, users should ensure their Java clients (including Connect and Kafka Streams) version is 2.1 or higher before upgrading brokers to 4.0.
Finally, care also needs to be taken when it comes to kafka clients that are not part of Apache Kafka, please see
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-896%3A+Remove+old+client+protocol+API+versions+in+Kafka+4.0">KIP-896</a> for the details.
<ahref="https://cwiki.apache.org/confluence/x/K5sODg">KIP-896</a> for the details.
</li>
<li>
Apache Kafka 4.0 only supports KRaft mode - ZooKeeper mode has been removed. As such, broker upgrades to 4.0.x (and higher) require KRaft mode and
@ -72,19 +72,19 @@
group coordinator can be tuned by setting the configurations with prefix <code>group.coordinator</code>.
</li>
<li>
The Next Generation of the Consumer Rebalance Protocol (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol">KIP-848</a>)
The Next Generation of the Consumer Rebalance Protocol (<ahref="https://cwiki.apache.org/confluence/x/HhD1D">KIP-848</a>)
is now Generally Available (GA) in Apache Kafka 4.0. The protocol is automatically enabled on the server when the upgrade to 4.0 is finalized.
Note that once the new protocol is used by consumer groups, the cluster can only downgrade to version 3.4.1 or newer.
Check <ahref="/{{version}}/documentation.html#consumer_rebalance_protocol">here</a> for details.
</li>
<li>
Transactions Server Side Defense (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense">KIP-890</a>)
Transactions Server Side Defense (<ahref="https://cwiki.apache.org/confluence/x/B40ODg">KIP-890</a>)
brings a strengthened transactional protocol to Apache Kafka 4.0. The new and improved transactional protocol is enabled when the upgrade to 4.0 is finalized.
When using 4.0 producer clients, the producer epoch is bumped on every transaction to ensure every transaction includes the intended messages and duplicates are not
written as part of the next transaction. Downgrading the protocol is safe. For more information check <ahref="/{{version}}/documentation.html#transaction_protocol">here</a>
</li>
<li>
Eligible Leader Replicas (<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas">KIP-966 Part 1</a>)
Eligible Leader Replicas (<ahref="https://cwiki.apache.org/confluence/x/mpOzDw">KIP-966 Part 1</a>)
enhances the replication protocol for the Apache Kafka 4.0. Now the KRaft controller keeps track of the data partition replicas that are
not included in ISR but are safe to be elected as leader without data loss. Such replicas are stored in the partition metadata as
the <code>Eligible Leader Replicas</code>(ELR).
@ -106,7 +106,7 @@
is now set to <code>org.apache.kafka.common.metrics.JmxReporter</code> by default.
</li>
<li>The constructor <code>org.apache.kafka.common.metrics.JmxReporter</code> with string argument was removed.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-606%3A+Add+Metadata+Context+to+MetricsReporter">KIP-606</a> for details.
See <ahref="https://cwiki.apache.org/confluence/x/SxIRCQ">KIP-606</a> for details.
</li>
<li>The <code>bufferpool-wait-time-total</code>, <code>io-waittime-total</code>, and <code>iotime-total</code> metrics were removed.
Please use <code>bufferpool-wait-time-ns-total</code>, <code>io-wait-time-ns-total</code>, and <code>io-time-ns-total</code> metrics as replacements, respectively.
@ -151,25 +151,25 @@
</li>
<li>The <code>log.message.timestamp.difference.max.ms</code> configuration was removed.
Please use <code>log.message.timestamp.before.max.ms</code> and <code>log.message.timestamp.after.max.ms</code> instead.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-937%3A+Improve+Message+Timestamp+Validation">KIP-937</a> for details.
See <ahref="https://cwiki.apache.org/confluence/x/thQ0Dw">KIP-937</a> for details.
</li>
<li>
The <code>remote.log.manager.copier.thread.pool.size</code> configuration default value was changed to 10 from -1.
Values of -1 are no longer valid. A minimum of 1 or higher is valid.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
See <ahref="https://cwiki.apache.org/confluence/x/FAqpEQ">KIP-1030</a>
</li>
<li>
The <code>remote.log.manager.expiration.thread.pool.size</code> configuration default value was changed to 10 from -1.
Values of -1 are no longer valid. A minimum of 1 or higher is valid.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
See <ahref="https://cwiki.apache.org/confluence/x/FAqpEQ">KIP-1030</a>
</li>
<li>
The <code>remote.log.manager.thread.pool.size</code> configuration default value was changed to 2 from 10.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
See <ahref="https://cwiki.apache.org/confluence/x/FAqpEQ">KIP-1030</a>
</li>
<li>
The minimum <code>segment.bytes/log.segment.bytes</code> has changed from 14 bytes to 1MB.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
See <ahref="https://cwiki.apache.org/confluence/x/FAqpEQ">KIP-1030</a>
</li>
</ul>
</li>
@ -240,7 +240,7 @@
</li>
<li>kafka-configs.sh now uses incrementalAlterConfigs API to alter broker configurations instead of the deprecated alterConfigs API,
and it will fall directly if the broker doesn't support incrementalAlterConfigs API, which means the broker version is prior to 2.3.x.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1011%3A+Use+incrementalAlterConfigs+when+updating+broker+configs+by+kafka-configs.sh">KIP-1011</a> for more details.
See <ahref="https://cwiki.apache.org/confluence/x/wIn5E">KIP-1011</a> for more details.
</li>
<li>The <code>kafka.admin.ZkSecurityMigrator</code> tool was removed.
</li>
@ -349,15 +349,15 @@
<li>
The minimum Java version required by clients and Kafka Streams applications has been increased from Java 8 to Java 11
while brokers, connect and tools now require Java 17.
See <ahref="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223">KIP-750</a> and
<ahref="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510">KIP-1013</a> for more details.
See <ahref="https://cwiki.apache.org/confluence/x/P4vOCg">KIP-750</a> and
<ahref="https://cwiki.apache.org/confluence/x/Bov5E">KIP-1013</a> for more details.
</li>
<li>
Java 23 support has been added in Apache Kafka 4.0
</li>
<li>
Scala 2.12 support has been removed in Apache Kafka 4.0
See <ahref="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218">KIP-751</a> for more details
See <ahref="https://cwiki.apache.org/confluence/x/OovOCg">KIP-751</a> for more details
</li>
<li>
Logging framework has been migrated from Log4j to Log4j2.
@ -377,35 +377,35 @@
For implementors of RemoteLogMetadataManager (RLMM), a new API `nextSegmentWithTxnIndex` is
introduced in RLMM to allow the implementation to return the next segment metadata with a transaction
index. This API is used when the consumers are enabled with isolation level as READ_COMMITTED.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058:+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions">KIP-1058</a> for more details.
See <ahref="https://cwiki.apache.org/confluence/x/BwuTEg">KIP-1058</a> for more details.
</li>
<li>
The criteria for identifying internal topics in ReplicationPolicy and DefaultReplicationPolicy have
been updated to enable the replication of topics that appear to be internal but aren't truly internal to Kafka and Mirror Maker 2.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1074%3A+Allow+the+replication+of+user+internal+topics">KIP-1074</a> for more details.
See <ahref="https://cwiki.apache.org/confluence/x/jA3OEg">KIP-1074</a> for more details.
</li>
<li>
KIP-714 is now enabled for Kafka Streams via <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1076%3A++Metrics+for+client+applications+KIP-714+extension">KIP-1076</a>.
KIP-714 is now enabled for Kafka Streams via <ahref="https://cwiki.apache.org/confluence/x/XA-OEg">KIP-1076</a>.
This allows to not only collect the metric of the internally used clients of a Kafka Streams application via a broker-side plugin,
but also to collect the <ahref="/{{version}}/documentation/#kafka_streams_monitoring">metrics</a> of the Kafka Streams runtime itself.
</li>
<li>
The default value of 'num.recovery.threads.per.data.dir' has been changed from 1 to 2. The impact of this is faster
recovery post unclean shutdown at the expense of extra IO cycles.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
See <ahref="https://cwiki.apache.org/confluence/x/FAqpEQ">KIP-1030</a>
</li>
<li>
The default value of 'message.timestamp.after.max.ms' has been changed from Long.Max to 1 hour. The impact of this messages with a
timestamp of more than 1 hour in the future will be rejected when message.timestamp.type=CreateTime is set.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations">KIP-1030</a>
See <ahref="https://cwiki.apache.org/confluence/x/FAqpEQ">KIP-1030</a>
</li>
<li>Introduced in KIP-890, the <code>TransactionAbortableException</code> enhances error handling within transactional operations by clearly indicating scenarios where transactions should be aborted due to errors. It is important for applications to properly manage both <code>TimeoutException</code> and <code>TransactionAbortableException</code> when working with transaction producers.</li>
<ul>
<li><b>TimeoutException:</b> This exception indicates that a transactional operation has timed out. Given the risk of message duplication that can arise from retrying operations after a timeout (potentially violating exactly-once semantics), applications should treat timeouts as reasons to abort the ongoing transaction.</li>
<li><b>TransactionAbortableException:</b> Specifically introduced to signal errors that should lead to transaction abortion, ensuring this exception is properly handled is critical for maintaining the integrity of transactional processing.</li>
<li>To ensure seamless operation and compatibility with future Kafka versions, developers are encouraged to update their error-handling logic to treat both exceptions as triggers for aborting transactions. This approach is pivotal for preserving exactly-once semantics.</li>
<li> See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense">KIP-890</a> and
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions">KIP-1050</a> for more details </li>
<li> See <ahref="https://cwiki.apache.org/confluence/x/B40ODg">KIP-890</a> and
<ahref="https://cwiki.apache.org/confluence/x/8ItyEg">KIP-1050</a> for more details </li>