<li>A number of deprecated classes, methods, configurations and tools have been removed.
<ul>
<li><b>Common</b>
<ul>
<li>The <code>metrics.jmx.blacklist</code> and <code>metrics.jmx.whitelist</code> configurations were removed from the <code>org.apache.kafka.common.metrics.JmxReporter</code>
Please use <code>metrics.jmx.exclude</code> and <code>metrics.jmx.include</code> respectively instead.
<li>The <code>bufferpool-wait-time-total</code>, <code>io-waittime-total</code>, and <code>iotime-total</code> metrics were removed.
Please use <code>bufferpool-wait-time-ns-total</code>, <code>io-wait-time-ns-total</code>, and <code>io-time-ns-total</code> metrics as replacements, respectively.
<li>The original MirrorMaker (MM1) and related classes were removed. Please use the Connect-based
MirrorMaker (MM2), as described in the <ahref="/{{version}}/documentation/#georeplication">Geo-Replication section.</a>.
</li>
<li>The <code>use.incremental.alter.configs</code> configuration was removedfrom <code>MirrorSourceConnector</code>.
The modified behavior is identical to the previous <code>required</code> configuration, therefore users should ensure that brokers in the target cluster are at least running 2.3.0.
</li>
<li>The <code>add.source.alias.to.metrics</code> configuration was removed from <code>MirrorSourceConnector</code>.
The source cluster alias is now always added to the metrics.
interface to build custom readers for the <code>kafka-console-producer</code> tool.
</li>
<li>The <code>kafka.tools.DefaultMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.DefaultMessageFormatter</code> class instead.
</li>
<li>The <code>kafka.tools.LoggingMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.LoggingMessageFormatter</code> class instead.
<li>The <code>kafka.tools.NoOpMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.NoOpMessageFormatter</code> class instead.
<li>The <code>--authorizer</code>, <code>--authorizer-properties</code>, and <code>--zk-tls-config-file</code> options were removed from the <code>kafka-acls</code> command line tool.
Please use <code>--bootstrap-server</code> or <code>--bootstrap-controller</code> instead.
interface to build custom decoders for the <code>kafka-dump-log</code> tool.
</li>
<li>The <code>kafka.coordinator.group.OffsetsMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.OffsetsMessageFormatter</code> class instead.
</li>
<li>The <code>kafka.coordinator.group.GroupMetadataMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.GroupMetadataMessageFormatter</code> class instead.
</li>
<li>The <code>kafka.coordinator.transaction.TransactionLogMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.TransactionLogMessageFormatter</code> class instead.
</li>
<li>The <code>--topic-white-list</code> option was removed from the <code>kafka-replica-verification</code> command line tool.
<li>The <code>whitelist</code> and <code>blacklist</code> configurations were removed from the <code>org.apache.kafka.connect.transforms.ReplaceField</code> transformation.
Please use <code>include</code> and <code>exclude</code> respectively instead.
For implementors of RemoteLogMetadataManager (RLMM), a new API `nextSegmentWithTxnIndex` is
introduced in RLMM to allow the implementation to return the next segment metadata with a transaction
index. This API is used when the consumers are enabled with isolation level as READ_COMMITTED.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058:+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions">KIP-1058</a> for more details.
<p><b>If you are upgrading from a version prior to 2.1.x, please see the note in step 5 below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
<p><b>If you are upgrading from a version prior to 3.3.0, please see the note in step 3 below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
</li>
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
<code>
bin/kafka-features.sh upgrade --metadata 3.9
</code>
</li>
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
Every <ahref="https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java">MetadataVersion</a>
after 3.2.x has a boolean parameter that indicates if there are metadata changes (i.e. <code>IBP_3_3_IV3(7, "3.3", "IV3", true)</code> means this version has metadata changes).
Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between.</li>
<h5><aid="upgrade_390_notable"href="#upgrade_390_notable">Notable changes in 3.9.0</a></h5>
<ul>
<li>In case you run your Kafka clusters with no execution permission for the <code>/tmp</code> partition, Kafka will not work properly. It might either refuse to start or fail
when producing and consuming messages. This is due to the compression libraries <code>zstd-jni</code> and <code>snappy</code>.
To remediate this problem you need to pass the following JVM flags to Kafka <code>ZstdTempFolder</code> and <code>org.xerial.snappy.tempdir</code> pointing to a directory with execution permissions.
For example, this could be done via the <code>KAFKA_OPTS</code> environment variable like follows: <code>export KAFKA_OPTS="-DZstdTempFolder=/opt/kafka/tmp -Dorg.xerial.snappy.tempdir=/opt/kafka/tmp"</code>.
<li>Tiered storage is now a production ready feature. You can check
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+GA+Release+Notes">Kafka Tiered Storage GA Release Notes</a> for more details.
The below enhancements are added in this release.
<ul>
<li>In KRaft mode, the tiered storage feature can be dynamically disabled and then re-enabled on topic level.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement">KIP-950</a> for more details.</li>
<li>With the tiered storage quota feature, users can define a maximum limit on the rate at which log segments are transferred to or retrieved from the remote storage.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas">KIP-956</a> for more details.</li>
<p><b>If you are upgrading from a version prior to 2.1.x, please see the note in step 5 below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
<p><b>If you are upgrading from a version prior to 3.3.0, please see the note in step 3 below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
</li>
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
<code>
bin/kafka-features.sh upgrade --metadata 3.8
</code>
</li>
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
Every <ahref="https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java">MetadataVersion</a>
after 3.2.x has a boolean parameter that indicates if there are metadata changes (i.e. <code>IBP_3_3_IV3(7, "3.3", "IV3", true)</code> means this version has metadata changes).
Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between.</li>
<h5><aid="upgrade_381_notable"href="#upgrade_381_notable">Notable changes in 3.8.1</a></h5>
<ul>
<li>In case you run your Kafka clusters with no execution permission for the <code>/tmp</code> partition, Kafka will not work properly. It might either refuse to start or fail
when producing and consuming messages. This is due to the compression libraries <code>zstd-jni</code> and <code>snappy</code>.
To remediate this problem you need to pass the following JVM flags to Kafka <code>ZstdTempFolder</code> and <code>org.xerial.snappy.tempdir</code> pointing to a directory with execution permissions.
For example, this could be done via the <code>KAFKA_OPTS</code> environment variable like follows: <code>export KAFKA_OPTS="-DZstdTempFolder=/opt/kafka/tmp -Dorg.xerial.snappy.tempdir=/opt/kafka/tmp"</code>.
This is a known issue for version 3.8.0.
</li>
<li>In 3.8.0 the <code>kafka.utils.Thottler</code> metric was accidentally renamed to <code>org.apache.kafka.storage.internals.utils.Throttler</code>.
This change has been reverted and the metric is now named <code>kafka.utils.Thottler</code> again.
<p><b>If you are upgrading from a version prior to 2.1.x, please see the note in step 5 below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
<p><b>If you are upgrading from a version prior to 3.3.0, please see the note in step 3 below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
</li>
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
Every <ahref="https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java">MetadataVersion</a>
after 3.2.x has a boolean parameter that indicates if there are metadata changes (i.e. <code>IBP_3_3_IV3(7, "3.3", "IV3", true)</code> means this version has metadata changes).
Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between.</li>
<li>Client APIs released prior to Apache Kafka 2.1 are now marked deprecated in 3.7 and will be removed in Apache Kafka 4.0. See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-896%3A+Remove+old+client+protocol+API+versions+in+Kafka+4.0">KIP-896</a> for details and RPC versions that are now deprecated.
</li>
<li>Early access of the new simplified Consumer Rebalance Protocol is available, and it is not recommended for use in production environments.
You are encouraged to test it and provide feedback!
For more information about the early access feature, please check <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol">KIP-848</a> and the <ahref="https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes">Early Access Release Notes</a>.
<li>More metrics related to Tiered Storage have been introduced. They should improve the operational experience
of running Tiered Storage in production.
For more detailed information, please refer to <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Additional+metrics+in+Tiered+Storage">KIP-963</a>.
<li>All the notable changes are present in the <ahref="https://kafka.apache.org/blog#apache_kafka_370_release_announcement">blog post announcing the 3.7.0 release.</a>
<p><b>If you are upgrading from a version prior to 2.1.x, please see the note in step 5 below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
<p><b>If you are upgrading from a version prior to 3.3.0, please see the note in step 3 below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
</li>
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
Every <ahref="https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java">MetadataVersion</a>
after 3.2.x has a boolean parameter that indicates if there are metadata changes (i.e. <code>IBP_3_3_IV3(7, "3.3", "IV3", true)</code> means this version has metadata changes).
Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between.</li>
<li>The configuration <code>log.message.timestamp.difference.max.ms</code> is deprecated.
Two new configurations, <code>log.message.timestamp.before.max.ms</code> and <code>log.message.timestamp.after.max.ms</code>, have been added.
For more detailed information, please refer to <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-937%3A+Improve+Message+Timestamp+Validation">KIP-937</a>.
Kafka Streams has introduced a new task assignor, <code>RackAwareTaskAssignor</code>, for computing task assignments which can minimize
cross rack traffic under certain conditions. It works with existing <code>StickyTaskAssignor</code> and <code>HighAvailabilityTaskAssignor</code>.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams">KIP-925</a>
and <ahref="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka Streams Developer Guide</b></a> for more details.
<li>Early access of tiered storage feature is available, and it is not recommended for use in production environments.
Welcome to test it and provide any feedback to us.
For more information about the early access tiered storage feature, please check <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage">KIP-405</a> and
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes">Tiered Storage Early Access Release Note</a>.
has been added to data partitions to prevent hanging transactions. This feature is enabled by default and can be disabled by setting <code>transaction.partition.verification.enable</code> to false.
The configuration can also be updated dynamically and is applied to the broker. Workloads running on version 3.6.0 with compression can experience
InvalidRecordExceptions and UnknownServerExceptions. Upgrading to 3.6.1 or newer or disabling the feature fixes the issue.
<p><b>If you are upgrading from a version prior to 2.1.x, please see the note in step 5 below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
<p><b>If you are upgrading from a version prior to 3.3.0, please see the note in step 3 below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
</li>
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
Every <ahref="https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java">MetadataVersion</a>
after 3.2.x has a boolean parameter that indicates if there are metadata changes (i.e. <code>IBP_3_3_IV3(7, "3.3", "IV3", true)</code> means this version has metadata changes).
Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between.</li>
<li>MirrorMaker now uses incrementalAlterConfigs API by default to synchronize topic configurations instead of the deprecated alterConfigs API.
A new settings called <code>use.incremental.alter.configs</code> is introduced to allow users to control which API to use.
This new setting is marked deprecated and will be removed in the next major release when incrementalAlterConfigs API is always used.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-894%3A+Use+incrementalAlterConfigs+API+for+syncing+topic+configurations">KIP-894</a> for more details.
</li>
<li>The JmxTool, EndToEndLatency, StreamsResetter, ConsumerPerformance and ClusterTool have been migrated to the tools module.
The 'kafka.tools' package is deprecated and will change to 'org.apache.kafka.tools' in the next major release.
See <ahref="https://issues.apache.org/jira/browse/KAFKA-14525">KAFKA-14525</a> for more details.
<li>In versions earlier than 3.5.0 and 3.4.1, MirrorMaker 2 offset translation could incorrectly translate offsets for topics using compaction, transactional producers, and filter SMTs.
In 3.5.0 and 3.4.1, offset translation has changed for all topics in order to ensure at-least-once delivery when a consumer is failed-over to the translated offsets.
Translated offsets will be earlier than in previous versions, so consumers using downstream offsets may initially have more lag, and re-deliver more data after failing-over.
See <ahref="https://issues.apache.org/jira/browse/KAFKA-12468">KAFKA-12468</a> and linked tickets, and <ahref="https://github.com/apache/kafka/pull/13178">PR #13178</a> for more details.
Further improvements to the offset translation are included in later releases to reduce the lag introduced by this change, so consider upgrading MM2 to the latest version available.
<h4><aid="upgrade_kraft_3_4_0"href="#upgrade_kraft_3_4_0">Upgrading a KRaft-based cluster to 3.4.0 from any version 3.0.x through 3.3.x</a></h4>
<p><b>If you are upgrading from a version prior to 3.3.0, please see the note below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li>Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
</li>
<li>Once the cluster's behavior and performance has been verified, bump the metadata.version by running
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
Every <ahref="https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java">MetadataVersion</a>
after 3.2.x has a boolean parameter that indicates if there are metadata changes (i.e. <code>IBP_3_3_IV3(7, "3.3", "IV3", true)</code> means this version has metadata changes).
Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between.</li>
<p><b>If you are upgrading from a version prior to 3.3.1, please see the note below. Once you have changed the metadata.version to the latest version, it will not be possible to downgrade to a version prior to 3.3-IV0.</b></p>
<li>Note that cluster metadata downgrade is not supported in this version since it has metadata changes.
Every <ahref="https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java">MetadataVersion</a>
after 3.2.x has a boolean parameter that indicates if there are metadata changes (i.e. <code>IBP_3_3_IV3(7, "3.3", "IV3", true)</code> means this version has metadata changes).
Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between.</li>
<li>KRaft mode is production ready for new clusters. See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready">KIP-833</a>
<li>The partitioner used by default for records with no keys has been improved to avoid pathological behavior when one or more brokers are slow.
The new logic may affect the batching behavior, which can be tuned using the <code>batch.size</code> and/or <code>linger.ms</code> configuration settings.
The previous behavior can be restored by setting <code>partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner</code>.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner">KIP-794</a> for more details.
<li>Introduced a new API <code>addMetricIfAbsent</code> to <code>Metrics</code> which would create a new Metric if not existing or return the same metric
if already registered. Note that this behaviour is different from <code>addMetric</code> API which throws an <code>IllegalArgumentException</code> when
trying to create an already existing metric. (See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-843%3A+Adding+addMetricIfAbsent+method+to+Metrics">KIP-843</a>
section of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default#KIP679:Producerwillenablethestrongestdeliveryguaranteebydefault-Compatibility,Deprecation,andMigrationPlan">KIP-679</a>
section of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default#KIP679:Producerwillenablethestrongestdeliveryguaranteebydefault-Compatibility,Deprecation,andMigrationPlan">KIP-679</a>
<li>The following metrics have been deprecated and will be removed in Apache Kafka 4.0: <code>bufferpool-wait-time-total</code>, <code>io-waittime-total</code>,
and <code>iotime-total</code>. Please use <code>bufferpool-wait-time-ns-total</code>, <code>io-wait-time-ns-total</code>,
and <code>io-time-ns-total</code> instead. See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-773%3A+Differentiate+consistently+metric+latency+measured+in+millis+and+nanos">KIP-773</a>
section of <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default#KIP679:Producerwillenablethestrongestdeliveryguaranteebydefault-Compatibility,Deprecation,andMigrationPlan">KIP-679</a>
<li>The producer has stronger delivery guarantees by default: <code>idempotence</code> is enabled and <code>acks</code> is set to <code>all</code> instead of <code>1</code>.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default">KIP-679</a> for details.
In 3.0.0 and 3.1.0, a bug prevented the idempotence default from being applied which meant that it remained disabled unless the user had explicitly set
<code>enable.idempotence</code> to true. Note that the bug did not affect the <code>acks=all</code> change. See <ahref="https://issues.apache.org/jira/browse/KAFKA-13598">KAFKA-13598</a> for more details.
<li>The release tarball no longer includes test, sources, javadoc and test sources jars. These are still published to the Maven Central repository. </li>
<li>A number of implementation dependency jars are <ahref="https://github.com/apache/kafka/pull/10203">now available in the runtime classpath
instead of compile and runtime classpaths</a>. Compilation errors after the upgrade can be fixed by adding the missing dependency jar(s) explicitly
or updating the application not to use internal classes.</li>
<code>inter.broker.protocol.version</code> upgrade to 3.0. This will avoid potential compatibility issues if the <code>inter.broker.protocol.version</code>
is downgraded. See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1">KIP-724</a> for more details.</li>
<li>Kafka Streams no longer has a compile time dependency on "connect:json" module (<ahref="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
<li>Custom principal builder implementations specified through <code>principal.builder.class</code> must now implement the
<code>KafkaPrincipalSerde</code> interface to allow for forwarding between brokers. See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller">KIP-590</a> for more details about the usage of KafkaPrincipalSerde.</li>
<li>A number of deprecated classes, methods and tools have been removed from the <code>clients</code>, <code>connect</code>, <code>core</code> and <code>tools</code> modules:</li>
<li>The Scala <code>Authorizer</code>, <code>SimpleAclAuthorizer</code> and related classes have been removed. Please use the Java <code>Authorizer</code>
and <code>AclAuthorizer</code> instead.</li>
<li>The <code>Metric#value()</code> method was removed (<ahref="https://issues.apache.org/jira/browse/KAFKA-12573">KAFKA-12573</a>).</li>
<li>In the <code>kafka-configs</code> command line tool, the <code>--zookeeper</code> option is only supported for updating <ahref="#security_sasl_scram_credentials">SCRAM Credentials configuration</a>
and <ahref="#dynamicbrokerconfigs">describing/updating dynamic broker configs when brokers are not running</a>. Please use <code>--bootstrap-server</code>
<li>The default <code>partition.assignment.strategy</code> is changed to "[RangeAssignor, CooperativeStickyAssignor]",
which will use the RangeAssignor by default, but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.
Please check the client upgrade path guide <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-429:+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-Consumer">here</a> for more detail.</li>
<li>The <code>quota.producer.default</code> and <code>quota.consumer.default</code> configurations were removed (<ahref="https://issues.apache.org/jira/browse/KAFKA-12591">KAFKA-12591</a>).
<li>The <code>port</code> and <code>host.name</code> configurations were removed. Please use <code>listeners</code> instead.</li>
<li>The <code>advertised.port</code> and <code>advertised.host.name</code> configurations were removed. Please use <code>advertised.listeners</code> instead.</li>
<li>The deprecated worker configurations <code>rest.host.name</code> and <code>rest.port</code> were removed (<ahref="https://issues.apache.org/jira/browse/KAFKA-12482">KAFKA-12482</a>) from the Kafka Connect worker configuration.
<li> The <code>Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)</code> method has been deprecated. Please use
<code>Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)</code> instead, where the <code>ConsumerGroupMetadata</code>
can be retrieved via <code>KafkaConsumer#groupMetadata()</code> for stronger semantics. Note that the full set of consumer group metadata is only
understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass
in <code>new ConsumerGroupMetadata(consumerGroupId)</code> to work with older brokers. See <ahref="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
The Connect <code>internal.key.converter</code> and <code>internal.value.converter</code> properties have been completely <ahref="https://cwiki.apache.org/confluence/x/2YDOCg">removed</a>.
The use of these Connect worker properties has been deprecated since version 2.0.0.
Workers are now hardcoded to use the JSON converter with <code>schemas.enable</code> set to <code>false</code>. If your cluster has been using
a different internal key or value converter, you can follow the migration steps outlined in <ahref="https://cwiki.apache.org/confluence/x/2YDOCg">KIP-738</a>
<li> The Connect-based MirrorMaker (MM2) includes changes to support <code>IdentityReplicationPolicy</code>, enabling replication without renaming topics.
The existing <code>DefaultReplicationPolicy</code> is still used by default, but identity replication can be enabled via the
<code>replication.policy</code> configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for
use-cases with simple one-way replication topologies where topic renaming is undesirable. Note that <code>IdentityReplicationPolicy</code>, unlike
<code>DefaultReplicationPolicy</code>, cannot prevent replication cycles based on topic names, so take care to avoid cycles when constructing your
The ability to print record offsets and headers with the <code>ConsoleConsumer</code> is now possible
via <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-431%3A+Support+of+printing+additional+ConsumerRecord+fields+in+DefaultMessageFormatter">KIP-431</a>
<li>Added a new public api <code>KafkaStreams.queryMetadataForKey(String, K, Serializer) to get detailed information on the key being queried.
It provides information about the partition number where the key resides in addition to hosts containing the active and standby partitions for the key.</code></li>
<li>Provided support to query stale stores (for high availability) and the stores belonging to a specific partition by deprecating <code>KafkaStreams.store(String, QueryableStoreType)</code> and replacing it with <code>KafkaStreams.store(StoreQueryParameters)</code>.</li>
<li>Added a new public api to access lag information for stores local to an instance with <code>KafkaStreams.allLocalStorePartitionLags()</code>.</li>
<li>ZooKeeper has been upgraded to 3.5.7, and a ZooKeeper upgrade from 3.4.X to 3.5.7 can fail if there are no snapshot files in the 3.4 data directory.
This usually happens in test upgrades where ZooKeeper 3.5.7 is trying to load an existing 3.4 data dir in which no snapshot file has been created.
For more details about the issue please refer to <ahref="https://issues.apache.org/jira/browse/ZOOKEEPER-3056">ZOOKEEPER-3056</a>.
A fix is given in <ahref="https://issues.apache.org/jira/browse/ZOOKEEPER-3056">ZOOKEEPER-3056</a>, which is to set <code>snapshot.trust.empty=true</code>
config in <code>zookeeper.properties</code> before the upgrade.
</li>
<li>ZooKeeper version 3.5.7 supports TLS-encrypted connectivity to ZooKeeper both with or without client certificates,
and additional Kafka configurations are available to take advantage of this.
See <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Enable+ZK+client+to+use+the+new+TLS+supported+authentication">KIP-515</a> for details.
<h4><aid="upgrade_2_4_0"href="#upgrade_2_4_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x or 2.3.x to 2.4.0</a></h4>
<p><b>If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.</b></p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li> Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
<li>ZooKeeper has been upgraded to 3.5.6. ZooKeeper upgrade from 3.4.X to 3.5.6 can fail if there are no snapshot files in 3.4 data directory.
This usually happens in test upgrades where ZooKeeper 3.5.6 is trying to load an existing 3.4 data dir in which no snapshot file has been created.
For more details about the issue please refer to <ahref="https://issues.apache.org/jira/browse/ZOOKEEPER-3056">ZOOKEEPER-3056</a>.
A fix is given in <ahref="https://issues.apache.org/jira/browse/ZOOKEEPER-3056">ZOOKEEPER-3056</a>, which is to set <code>snapshot.trust.empty=true</code>
config in <code>zookeeper.properties</code> before the upgrade. But we have observed data loss in standalone cluster upgrades when using
<code>snapshot.trust.empty=true</code> config. For more details about the issue please refer to <ahref="https://issues.apache.org/jira/browse/ZOOKEEPER-3644">ZOOKEEPER-3644</a>.
So we recommend the safe workaround of copying empty <ahref="https://issues.apache.org/jira/secure/attachment/12928686/snapshot.0">snapshot</a> file to the 3.4 data directory,
if there are no snapshot files in 3.4 data directory. For more details about the workaround please refer to <ahref="https://cwiki.apache.org/confluence/display/ZOOKEEPER/Upgrade+FAQ">ZooKeeper Upgrade FAQ</a>.
AdminServer is enabled by default in ZooKeeper and is started on port 8080.
AdminServer is disabled by default in the ZooKeeper config (<code>zookeeper.properties</code>) provided by the Apache Kafka distribution.
Make sure to update your local <code>zookeeper.properties</code> file with <code>admin.enableServer=false</code> if you wish to disable the AdminServer.
<li>A new Admin API has been added for partition reassignments. Due to changing the way Kafka propagates reassignment information,
it is possible to lose reassignment state in failure edge cases while upgrading to the new version. It is not recommended to start reassignments while upgrading.</li>
<li>The <code>bin/kafka-preferred-replica-election.sh</code> command line tool has been deprecated. It has been replaced by <code>bin/kafka-leader-election.sh</code>.</li>
<li>The methods <code>electPreferredLeaders</code> in the Java <code>AdminClient</code> class have been deprecated in favor of the methods <code>electLeaders</code>.</li>
<li>Scala code leveraging the <code>NewTopic(String, int, short)</code> constructor with literal values will need to explicitly call <code>toShort</code> on the second literal.</li>
<li>The internal <code>PartitionAssignor</code> interface has been deprecated and replaced with a new <code>ConsumerPartitionAssignor</code> in the public API. Some
methods/signatures are slightly different between the two interfaces. Users implementing a custom PartitionAssignor should migrate to the new interface as soon as possible.
<li>The <code>DefaultPartitioner</code> now uses a sticky partitioning strategy. This means that records for specific topic with null keys and no assigned partition
will be sent to the same partition until the batch is ready to be sent. When a new batch is created, a new partition is chosen. This decreases latency to produce, but
it may result in uneven distribution of records across partitions in edge cases. Generally users will not be impacted, but this difference may be noticeable in tests and
<li>The blocking <code>KafkaConsumer#committed</code> methods have been extended to allow a list of partitions as input parameters rather than a single partition.
It enables fewer request/response iterations between clients and brokers fetching for the committed offsets for the consumer group.
The old overloaded functions are deprecated and we would recommend users to make their code changes to leverage the new methods (details
can be found in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-520%3A+Add+overloaded+Consumer%23committed+for+batching+partitions">KIP-520</a>).
the validation due to various causes (mismatch magic bytes, crc checksum errors, null key for log compacted topics, etc), the whole batch would be rejected
with the same and misleading <code>CORRUPT_MESSAGE</code>, and the caller of the producer client would see the corresponding exception from either
the future object of <code>RecordMetadata</code> returned from the <code>send</code> call as well as in the <code>Callback#onCompletion(RecordMetadata metadata, Exception exception)</code>
Now with the new error code and improved error messages of the exception, producer callers would be better informed about the root cause why their sent records were failed.
<li>We are introducing incremental cooperative rebalancing to the clients' group protocol, which allows consumers to keep all of their assigned partitions during a rebalance
and at the end revoke only those which must be migrated to another consumer for overall cluster balance. The <code>ConsumerCoordinator</code> will choose the latest <code>RebalanceProtocol</code>
that is commonly supported by all of the consumer's supported assignors. You can use the new built-in <code>CooperativeStickyAssignor</code> or plug in your own custom cooperative assignor. To do
so you must implement the <code>ConsumerPartitionAssignor</code> interface and include <code>RebalanceProtocol.COOPERATIVE</code> in the list returned by <code>ConsumerPartitionAssignor#supportedProtocols</code>.
Your custom assignor can then leverage the <code>ownedPartitions</code> field in each consumer's <code>Subscription</code> to give partitions back to their previous owners whenever possible. Note that when
a partition is to be reassigned to another consumer, it <em>must</em> be removed from the new assignment until it has been revoked from its original owner. Any consumer that has to revoke a partition will trigger
a followup rebalance to allow the revoked partition to safely be assigned to its new owner. See the
<ahref="https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html">ConsumerPartitionAssignor RebalanceProtocol javadocs</a> for more information.
<br>
To upgrade from the old (eager) protocol, which always revokes all partitions before rebalancing, to cooperative rebalancing, you must follow a specific upgrade path to get all clients on the same <code>ConsumerPartitionAssignor</code>
that supports the cooperative protocol. This can be done with two rolling bounces, using the <code>CooperativeStickyAssignor</code> for the example: during the first one, add "cooperative-sticky" to the list of supported assignors
for each member (without removing the previous assignor -- note that if previously using the default, you must include that explicitly as well). You then bounce and/or upgrade it.
Once the entire group is on 2.4+ and all members have the "cooperative-sticky" among their supported assignors, remove the other assignor(s) and perform a second rolling bounce so that by the end all members support only the
cooperative protocol. For further details on the cooperative rebalancing protocol and upgrade path, see <ahref="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>.
</li>
<li>There are some behavioral changes to the <code>ConsumerRebalanceListener</code>, as well as a new API. Exceptions thrown during any of the listener's three callbacks will no longer be swallowed, and will instead be re-thrown
all the way up to the <code>Consumer.poll()</code> call. The <code>onPartitionsLost</code> method has been added to allow users to react to abnormal circumstances where a consumer may have lost ownership of its partitions
(such as a missed rebalance) and cannot commit offsets. By default, this will simply call the existing <code>onPartitionsRevoked</code> API to align with previous behavior. Note however that <code>onPartitionsLost</code> will not
be called when the set of lost partitions is empty. This means that no callback will be invoked at the beginning of the first rebalance of a new consumer joining the group.
<br>
The semantics of the <code>ConsumerRebalanceListener's</code> callbacks are further changed when following the cooperative rebalancing protocol described above. In addition to <code>onPartitionsLost</code>, <code>onPartitionsRevoked</code>
will also never be called when the set of revoked partitions is empty. The callback will generally be invoked only at the end of a rebalance, and only on the set of partitions that are being moved to another consumer. The
<code>onPartitionsAssigned</code> callback will however always be called, even with an empty set of partitions, as a way to notify users of a rebalance event (this is true for both cooperative and eager). For details on
the new callback semantics, see the <ahref="https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html">ConsumerRebalanceListener javadocs</a>.
<h4><aid="upgrade_2_3_0"href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0</a></h4>
The new protocol does not require stopping all the tasks during a rebalancing phase between Connect workers. Instead, only the tasks that need to be exchanged
between workers are stopped and they are started in a follow up rebalance. The new Connect protocol is enabled by default beginning with 2.3.0.
For more details on how it works and how to enable the old behavior of eager rebalancing, checkout
<li> We are introducing static membership towards consumer user. This feature reduces unnecessary rebalances during normal application upgrades or rolling bounces.
<li> Kafka Streams DSL switches its used store types. While this change is mainly transparent to users, there are some corner cases that may require code changes.
<h4><aid="upgrade_2_2_0"href="#upgrade_2_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x to 2.2.0</a></h4>
<h5><aid="upgrade_220_notable"href="#upgrade_220_notable">Notable changes in 2.2.0</a></h5>
<ul>
<li>The default consumer group id has been changed from the empty string (<code>""</code>) to <code>null</code>. Consumers who use the new default group id will not be able to subscribe to topics,
and fetch or commit offsets. The empty string as consumer group id is deprecated but will be supported until a future major release. Old clients that rely on the empty string group id will now
have to explicitly provide it as part of their consumer config. For more information see
<li>The <code>bin/kafka-topics.sh</code> command line tool is now able to connect directly to brokers with <code>--bootstrap-server</code> instead of zookeeper. The old <code>--zookeeper</code>
option is still available for now. Please read <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-377%3A+TopicCommand+to+use+AdminClient">KIP-377</a> for more information.</li>
<h4><aid="upgrade_2_1_0"href="#upgrade_2_1_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 2.0.0 to 2.1.0</a></h4>
<li>The default for console consumer's <code>enable.auto.commit</code> property when no <code>group.id</code> is provided is now set to <code>false</code>.
This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.</li>
<li>The default value for the producer's <code>retries</code> config was changed to <code>Integer.MAX_VALUE</code>, as we introduced <code>delivery.timeout.ms</code>
in <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer">KIP-91</a>,
which sets an upper bound on the total time between sending a record and receiving acknowledgement from the broker. By default,
you will instead need to override <code>delivery.timeout.ms</code>.</li>
<li>The <code>ListGroup</code> API now expects, as a recommended alternative, <code>Describe Group</code> access to the groups a user should be able to list.
Even though the old <code>Describe Cluster</code> access is still supported for backward compatibility, using it for this API is not advised.</li>
<li><ahref="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87298242">KIP-336</a> deprecates the ExtendedSerializer and ExtendedDeserializer interfaces and
propagates the usage of Serializer and Deserializer. ExtendedSerializer and ExtendedDeserializer were introduced with
<ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers">KIP-82</a> to provide record headers for serializers and deserializers
in a Java 7 compatible fashion. Now we consolidated these interfaces as Java 7 support has been dropped since.</li>
<li>Unclean leader election is automatically enabled by the controller when <code>unclean.leader.election.enable</code> config is dynamically updated by using per-topic config override.</li>
<li>The <code>AdminClient</code> has added a method <code>AdminClient#metrics()</code>. Now any application using the <code>AdminClient</code> can gain more information
and insight by viewing the metrics captured from the <code>AdminClient</code>. For more information
see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-324%3A+Add+method+to+get+metrics%28%29+in+AdminClient">KIP-324</a>
<li>Kafka now supports Zstandard compression from <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression">KIP-110</a>.
You must upgrade the broker as well as clients to make use of it. Consumers prior to 2.1.0 will not be able to read from topics which use
Zstandard compression, so you should not enable it for a topic until all downstream consumers are upgraded. See the KIP for more detail.
<p><b>NOTE:</b> any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again.
<li><ahref="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a> increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for 7 days instead of 1. You can preserve the existing behavior by setting the broker config <code>offsets.retention.minutes</code> to 1440.</li>
<li> The default value for <code>ssl.endpoint.identification.algorithm</code> was changed to <code>https</code>, which performs hostname verification (man-in-the-middle attacks are possible otherwise). Set <code>ssl.endpoint.identification.algorithm</code> to an empty string to restore the previous behaviour. </li>
<li><ahref="https://issues.apache.org/jira/browse/KAFKA-5674">KAFKA-5674</a> extends the lower interval of <code>max.connections.per.ip</code> minimum to zero and therefore allows IP-based filtering of inbound connections.</li>
added API version tag to the metric <code>kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}</code>.
This metric now becomes <code>kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...},version={0|1|2|3|...}</code>. This will impact
JMX monitoring tools that do not automatically aggregate. To get the total count for a specific request type, the tool needs to be
<li><ahref="https://cwiki.apache.org/confluence/x/uaBzB">KIP-225</a> changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" has been removed.</li>
in the Scala producers. Users migrating should consider configuring a custom partitioner that retains the previous behaviour.
Note that the Scala producers in 1.1.0 (and older) will continue to work even if the brokers are upgraded to 2.0.0.</li>
<li>MirrorMaker and ConsoleConsumer no longer support the Scala consumer, they always use the Java consumer.</li>
<li>The ConsoleProducer no longer supports the Scala producer, it always uses the Java producer.</li>
<li>A number of deprecated tools that rely on the Scala clients have been removed: ReplayLogProducer, SimpleConsumerPerformance, SimpleConsumerShell, ExportZkOffsets, ImportZkOffsets, UpdateOffsetsInZK, VerifyConsumerRebalance.</li>
<li>New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from older version. </li>
<li><ahref="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for Kafka Streams repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.</li>
<li>Updated <code>ProcessorStateManager</code> APIs in Kafka Streams for registering state stores to the processor topology. For more details please read the Streams <ahref="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_200">Upgrade Guide</a>.</li>
In earlier releases, Connect's worker configuration required the <code>internal.key.converter</code> and <code>internal.value.converter</code> properties.
In 2.0, these are <ahref="https://cwiki.apache.org/confluence/x/AZQ7B">no longer required</a> and default to the JSON converter.
You may safely remove these properties from your Connect standalone and distributed worker configurations:<br/>
<li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
<li>The AclCommand tool <code>--producer</code> convenience option uses the <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li>
handling on Kafka broker, which has typically been a memory-intensive operation. The KIP adds a mechanism by which the operation becomes less memory intensive
by down-converting chunks of partition data at a time which helps put an upper bound on memory consumption. With this improvement, there is a change in
<code>FetchResponse</code> protocol behavior where the broker could send an oversized message batch towards the end of the response with an invalid offset.
Such oversized messages must be ignored by consumer clients, as is done by <code>KafkaConsumer</code>.
<p>KIP-283 also adds new topic and broker configurations <code>message.downconversion.enable</code> and <code>log.message.downconversion.enable</code> respectively
to control whether down-conversion is enabled. When disabled, broker does not perform any down-conversion and instead sends an <code>UNSUPPORTED_VERSION</code>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-219+-+Improve+quota+communication">KIP-219</a>: Bump up the protocol versions of non-cluster action requests and responses that are throttled on quota violation.</li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a>: Bump up the protocol versions ACL create, describe and delete requests and responses.</li>
<li> Upgrading your Streams application from 1.1 to 2.0 does not require a broker upgrade.
A Kafka Streams 2.0 application can connect to 2.0, 1.1, 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
<li> Note that in 2.0 we have removed the public APIs that are deprecated prior to 1.0; users leveraging on those deprecated APIs need to make code changes accordingly.
See <ahref="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_200">Streams API changes in 2.0.0</a> for more details. </li>
<li><ahref="https://cwiki.apache.org/confluence/x/uaBzB">KIP-225</a> changed the metric "records.lag" to use tags for topic and partition. The original version with the name format "{topic}-{partition}.records-lag" is deprecated and will be removed in 2.0.0.</li>
<li>Kafka Connect now supports message headers in both sink and source connectors, and to manipulate them via simple message transforms. Connectors must be changed to explicitly use them. A new <code>HeaderConverter</code> is introduced to control how headers are (de)serialized, and the new "SimpleHeaderConverter" is used by default to use string representations of values.</li>
<li>Topic deletion is now enabled by default, since the functionality is now stable. Users who wish to
to retain the previous behavior should set the broker config <code>delete.topic.enable</code> to <code>false</code>. Keep in mind that topic deletion removes data and the operation is not reversible (i.e. there is no "undelete" operation)</li>
<li>For topics that support timestamp search if no offset can be found for a partition, that partition is now included in the search result with a null offset value. Previously, the partition was not included in the map.
This change was made to make the search behavior consistent with the case of topics not supporting timestamp search.
on live log directories even if there are offline log directories. A log directory may become offline due to IOException
caused by hardware failure. Users need to monitor the per-broker metric <code>offlineLogDirectoryCount</code> to check
whether there is offline log directory. </li>
<li>Added KafkaStorageException which is a retriable exception. KafkaStorageException will be converted to NotLeaderForPartitionException in the response
<li>The deprecated tool <code>kafka-consumer-offset-checker.sh</code> has been removed. Use <code>kafka-consumer-groups.sh</code> to get consumer group details.</li>
<li> If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed. </li>
<li> There are a few public APIs including <code>ProcessorContext#schedule()</code>, <code>Processor#punctuate()</code> and <code>KStreamBuilder</code>, <code>TopologyBuilder</code> are being deprecated by new APIs.
<h5><aid="upgrade_100_streams_from_0102"href="#upgrade_100_streams_from_0102">Upgrading a 0.10.2 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.2 to 1.0 does not require a broker upgrade.
A Kafka Streams 1.0 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
<li> If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed. </li>
<li> There are a few public APIs including <code>ProcessorContext#schedule()</code>, <code>Processor#punctuate()</code> and <code>KStreamBuilder</code>, <code>TopologyBuilder</code> are being deprecated by new APIs.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
<li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
<li> See <ahref="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> for more details. </li>
</ul>
<h5><aid="upgrade_100_streams_from_0101"href="#upgrade_1100_streams_from_0101">Upgrading a 0.10.1 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.1 to 1.0 does not require a broker upgrade.
A Kafka Streams 1.0 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
<li> You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed. </li>
<li> There are a few public APIs including <code>ProcessorContext#schedule()</code>, <code>Processor#punctuate()</code> and <code>KStreamBuilder</code>, <code>TopologyBuilder</code> are being deprecated by new APIs.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
<li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
<li> If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface was changed. </li>
<li> If you register custom metrics, you will need to update this code, because the <code>StreamsMetric</code> interface was changed. </li>
<li> See <ahref="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_100">Streams API changes in 1.0.0</a>,
<ahref="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> and
<ahref="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
</ul>
<h5><aid="upgrade_100_streams_from_0100"href="#upgrade_100_streams_from_0100">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.0 to 1.0 does require a <ahref="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 1.0 application can only connect to 0.1, 0.11.0, 0.10.2, or 0.10.1 brokers. </li>
<li> There are couple of API changes, that are not backward compatible (cf. <ahref="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_100">Streams API changes in 1.0.0</a>,
<ahref="/{{version}}/documentation/streams#streams_api_changes_0110">Streams API changes in 0.11.0</a>,
<ahref="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a>, and
<ahref="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> Upgrading from 0.10.0.x to 1.0.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
As an alternative, an offline upgrade is also possible.
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.11.0.3 </li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed 1.0.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.from</code></li>
<p>Kafka 0.11.0.0 introduces a new message format version as well as wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the <ahref="#upgrade_1100_notable">notable changes in 0.11.0.0</a> before upgrading.
<li> Once the entire cluster is upgraded, bump the protocol version by editing <code>inter.broker.protocol.version</code> and setting it to 0.11.0, but
do not change <code>log.message.format.version</code> yet. </li>
<li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
<li> See <ahref="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> for more details. </li>
<h5><aid="upgrade_1100_streams_from_0101"href="#upgrade_1100_streams_from_0101">Upgrading a 0.10.1 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.1 to 0.11.0 does not require a broker upgrade.
A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
<li> You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
<li> If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface was changed. </li>
<li> If you register custom metrics, you will need to update this code, because the <code>StreamsMetric</code> interface was changed. </li>
<li> See <ahref="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> and
<ahref="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
</ul>
<h5><aid="upgrade_1100_streams_from_0100"href="#upgrade_1100_streams_from_0100">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.0 to 0.11.0 does require a <ahref="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.11.0 application can only connect to 0.11.0, 0.10.2, or 0.10.1 brokers. </li>
<li> There are couple of API changes, that are not backward compatible (cf. <ahref="/{{version}}/documentation/streams#streams_api_changes_0110">Streams API changes in 0.11.0</a>,
<ahref="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a>, and
<ahref="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> Upgrading from 0.10.0.x to 0.11.0.3 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
As an alternative, an offline upgrade is also possible.
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.11.0.3 </li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed 0.11.0.3 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.from</code></li>
<li>Added user headers support through a new Headers interface providing user headers read and write access.</li>
<li>ProducerRecord and ConsumerRecord expose the new Headers API via <code>Headers headers()</code> method call.</li>
<li>ExtendedSerializer and ExtendedDeserializer interfaces are introduced to support serialization and deserialization for headers. Headers will be ignored if the configured serializer and deserializer are not the above classes.</li>
This config specifies the time, in milliseconds, that the <code>GroupCoordinator</code> will delay the initial consumer rebalance.
The rebalance will be further delayed by the value of <code>group.initial.rebalance.delay.ms</code> as new members join the group, up to a maximum of <code>max.poll.interval.ms</code>.
<li><code>org.apache.kafka.common.Cluster#partitionsForTopic</code>, <code>partitionsForNode</code> and <code>availablePartitionsForTopic</code> methods
will return an empty list instead of <code>null</code> (which is considered a bad practice) in case the metadata for the required topic does not exist.
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers">KIP-82</a>: ProduceRequest v3 introduces an array of <code>header</code> in the message protocol, containing <code>key</code> field and <code>value</code> field.</li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers">KIP-82</a>: FetchResponse v5 introduces an array of <code>header</code> in the message protocol, containing <code>key</code> field and <code>value</code> field.</li>
<h5><aid="upgrade_11_message_format"href="#upgrade_11_message_format">Notes on the new message format in 0.11.0</a></h5>
<p>The 0.11.0 message format includes several major enhancements in order to support better delivery semantics for the producer
(see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging">KIP-98</a>)
and improved replication fault tolerance
(see <ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation">KIP-101</a>).
Although the new format contains more information to make these improvements possible, we have made the batch format much
more efficient. As long as the number of messages per batch is more than 2, you can expect lower overall overhead. For smaller
batches, however, there may be a small performance impact. See <ahref="bit.ly/kafka-eos-perf">here</a> for the results of our
initial performance analysis of the new message format. You can also find more detail on the message format in the
<li> Update server.properties file on all brokers and add the following properties:
<ul>
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0 or 0.10.1).</li>
<li>log.message.format.version=CURRENT_KAFKA_VERSION (See <ahref="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.)
</ul>
</li>
<li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li>
<li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.2. </li>
<li> If your previous message format is 0.10.0, change log.message.format.version to 0.10.2 (this is a no-op as the message format is the same for 0.10.0, 0.10.1 and 0.10.2).
If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.</li>
<li> Restart the brokers one by one for the new protocol version to take effect. </li>
<li> If log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later,
then change log.message.format.version to 0.10.2 on each broker and restart them one by one. </li>
</ol>
<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
<li> Upgrading your Streams application from 0.10.1 to 0.10.2 does not require a broker upgrade.
A Kafka Streams 0.10.2 application can connect to 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
<li> You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface was changed. </li>
<li> If you register custom metrics, you will need to update this code, because the <code>StreamsMetric</code> interface was changed. </li>
<h5><aid="upgrade_1020_streams_from_0100"href="#upgrade_1020_streams_from_0100">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.0 to 0.10.2 does require a <ahref="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.2 application can only connect to 0.10.2 or 0.10.1 brokers. </li>
<li> There are couple of API changes, that are not backward compatible (cf. <ahref="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> Upgrading from 0.10.0.x to 0.10.2.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
As an alternative, an offline upgrade is also possible.
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.2.2 </li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed 0.10.2.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.from</code></li>
<li> The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. The internal Kafka Streams producer <code>retries</code> default value was changed from 0 to 10. The internal Kafka Streams consumer <code>max.poll.interval.ms</code> default value was changed from 300000 to <code>Integer.MAX_VALUE</code>.
<li>The Zookeeper dependency was removed from the Streams API. The Streams API now uses the Kafka protocol to manage internal topics instead of
modifying Zookeeper directly. This eliminates the need for privileges to access Zookeeper directly and "StreamsConfig.ZOOKEEPER_CONFIG"
should not be set in the Streams app any more. If the Kafka cluster is secured, Streams apps must have the required security privileges to create new topics.</li>
<li>Several new fields including "security.protocol", "connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and "request.timeout.ms" were added to
StreamsConfig class. User should pay attention to the default values and set these if needed. For more details please refer to <ahref="/{{version}}/documentation/#streamsconfigs">3.5 Kafka Streams Configs</a>.</li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update">KIP-88</a>: OffsetFetchRequest v2 supports retrieval of offsets for all topics if the <code>topics</code> array is set to <code>null</code>. </li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update">KIP-88</a>: OffsetFetchResponse v2 introduces a top-level <code>error_code</code> field. </li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic">KIP-103</a>: UpdateMetadataRequest v3 introduces a <code>listener_name</code> field to the elements of the <code>end_points</code> array. </li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-108%3A+Create+Topic+Policy">KIP-108</a>: CreateTopicsRequest v1 introduces a <code>validate_only</code> field. </li>
<li><ahref="https://cwiki.apache.org/confluence/display/KAFKA/KIP-108%3A+Create+Topic+Policy">KIP-108</a>: CreateTopicsResponse v1 introduces an <code>error_message</code> field to the elements of the <code>topic_errors</code> array. </li>
<li>log.message.format.version=CURRENT_KAFKA_VERSION (See <ahref="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.)
<li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li>
<li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.1.0. </li>
<li> If your previous message format is 0.10.0, change log.message.format.version to 0.10.1 (this is a no-op as the message format is the same for both 0.10.0 and 0.10.1).
If your previous message format version is lower than 0.10.0, do not change log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.</li>
<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
<h5><aid="upgrade_10_1_breaking"href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a></h5>
<ul>
<li> The log retention time is no longer based on last modified time of the log segments. Instead it will be based on the largest timestamp of the messages in a log segment.</li>
<li> The log rolling time is no longer depending on log segment create time. Instead it is now based on the timestamp in the messages. More specifically. if the timestamp of the first message in the segment is T, the log will be rolled out when a new message has a timestamp greater than or equal to T + log.roll.ms </li>
<li> The open file handlers of 0.10.0 will increase by ~33% because of the addition of time index files for each segment.</li>
<li> The time index and offset index share the same index size configuration. Since each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes to avoid potential frequent log rolling. </li>
<li> Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time. </li>
<li> Upgrading your Streams application from 0.10.0 to 0.10.1 does require a <ahref="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.1 application can only connect to 0.10.1 brokers. </li>
<li> There are couple of API changes, that are not backward compatible (cf. <ahref="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.1.2 </li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.from</code></li>
<li> The new Java consumer is no longer in beta and we recommend it for all new development. The old Scala consumers are still supported, but they will be deprecated in the next release
and will be removed in a future major release. </li>
<li> The <code>--new-consumer</code>/<code>--new.consumer</code> switch is no longer required to use tools like MirrorMaker and the Console Consumer with the new consumer; one simply
needs to pass a Kafka broker to connect to instead of the ZooKeeper ensemble. In addition, usage of the Console Consumer with the old consumer has been deprecated and it will be
<li> Kafka clusters can now be uniquely identified by a cluster id. It will be automatically generated when a broker is upgraded to 0.10.1.0. The cluster id is available via the kafka.server:type=KafkaServer,name=ClusterId metric and it is part of the Metadata response. Serializers, client interceptors and metric reporters can receive the cluster id by implementing the ClusterResourceListener interface. </li>
<li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li>
<li> Fetch responses have a size limit by default (50 MB for consumers and 10 MB for replication). The existing per partition limits also apply (1 MB for consumers
and replication). Note that neither of these limits is an absolute maximum as explained in the next point. </li>
<li> Consumers and replicas can make progress if a message larger than the response/partition size limit is found. More concretely, if the first message in the
first non-empty partition of the fetch is larger than either or both limits, the message will still be returned. </li>
<li> Overloaded constructors were added to <code>kafka.api.FetchRequest</code> and <code>kafka.javaapi.FetchRequest</code> to allow the caller to specify the
order of the partitions (since order is significant in v3). The previously existing constructors were deprecated and the partitions are shuffled before
the request is sent to avoid starvation issues. </li>
0.10.0.0 has <ahref="#upgrade_10_breaking">potential breaking changes</a> (please review before upgrading) and possible <ahref="#upgrade_10_performance_impact"> performance impact following the upgrade</a>. By following the recommended rolling upgrade plan below, you guarantee no downtime and no performance impact during and following the upgrade.
<br>
Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.
<li> Update server.properties file on all brokers and add the following properties:
<ul>
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 or 0.9.0.0).</li>
<li>log.message.format.version=CURRENT_KAFKA_VERSION (See <ahref="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.)
<li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.0.0. NOTE: You shouldn't touch log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0 </li>
<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
Reports from the Kafka community on the performance impact have shown CPU utilization going from 20% before to 100% after an upgrade, which forced an immediate upgrade of all clients to bring performance back to normal.
To avoid such message conversion before consumers are upgraded to 0.10.0.0, one can set log.message.format.version to 0.8.2 or 0.9.0 when upgrading the broker to 0.10.0.0. This way, the broker can still use zero-copy transfer to send the data to the old consumers. Once consumers are upgraded, one can change the message format to 0.10.0 on the broker and enjoy the new message format that includes new timestamp and improved compression.
The conversion is supported to ensure compatibility and can be useful to support a few apps that have not updated to newer clients yet, but is impractical to support all consumer traffic on even an overprovisioned cluster. Therefore, it is critical to avoid the message conversion as much as possible when brokers have been upgraded but the majority of clients have not.
<li> Starting from Kafka 0.10.0.0, the message format version in Kafka is represented as the Kafka version. For example, message format 0.9.0 refers to the highest message version supported by Kafka 0.9.0. </li>
<li> Message format 0.10.0 has been introduced and it is used by default. It includes a timestamp field in the messages and relative offsets are used for compressed messages. </li>
<li> ProduceRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li>
<li> FetchRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0 </li>
<li> MessageFormatter's package was changed from <code>kafka.tools</code> to <code>kafka.common</code></li>
<li> MessageReader's package was changed from <code>kafka.tools</code> to <code>kafka.common</code></li>
<li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called. </li>
<li> The 0.7 KafkaMigrationTool is no longer packaged with Kafka. If you need to migrate from 0.7 to 0.10.0, please migrate to 0.8 first and then follow the documented upgrade process to upgrade from 0.8 to 0.10.0. </li>
<li> The new consumer has standardized its APIs to accept <code>java.util.Collection</code> as the sequence type for method parameters. Existing code may have to be updated to work with the 0.10.0 client library. </li>
<li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka Streams</b> is available for stream processing on data stored in Kafka topics. This new client library only works with 0.10.x and upward versioned brokers due to message format changes mentioned above. For more information please read <ahref="/{{version}}/documentation/streams">Streams documentation</a>.</li>
<li> The default value of the configuration parameter <code>receive.buffer.bytes</code> is now 64K for the new consumer.</li>
<li> The new consumer now exposes the configuration parameter <code>exclude.internal.topics</code> to restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. By default, it is enabled.</li>
<li> The old Scala producer has been deprecated. Users should migrate their code to the Java producer included in the kafka-clients JAR as soon as possible. </li>
<li> The new consumer API has been marked stable. </li>
0.9.0.0 has <ahref="#upgrade_9_breaking">potential breaking changes</a> (please review before upgrading) and an inter-broker protocol change from previous versions. This means that upgraded brokers and clients may not be compatible with older versions. It is important that you upgrade your Kafka cluster before upgrading your clients. If you are using MirrorMaker downstream clusters should be upgraded first as well.
<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.
<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
<li> Broker IDs above 1000 are now reserved by default to automatically assigned broker IDs. If your cluster has existing broker IDs above that threshold make sure to increase the reserved.broker.max.id broker configuration property accordingly. </li>
<li> Configuration parameter replica.lag.max.messages was removed. Partition leaders will no longer consider the number of lagging messages when deciding which replicas are in sync. </li>
<li> Configuration parameter replica.lag.time.max.ms now refers not just to the time passed since last fetch request from replica, but also to time since the replica last caught up. Replicas that are still fetching messages from leaders but did not catch up to the latest messages in replica.lag.time.max.ms will be considered out of sync. </li>
<li> Compacted topics no longer accept messages without key and an exception is thrown by the producer if this is attempted. In 0.8.x, a message without key would cause the log compaction thread to subsequently complain and quit (and stop compacting all compacted topics). </li>
<li> MirrorMaker no longer supports multiple target clusters. As a result it will only accept a single --consumer.config parameter. To mirror multiple source clusters, you will need at least one MirrorMaker instance per source cluster, each with its own consumer configuration. </li>
<li> Tools packaged under <em>org.apache.kafka.clients.tools.*</em> have been moved to <em>org.apache.kafka.tools.*</em>. All included scripts will still function as usual, only custom code directly importing these classes will be affected. </li>
<li> The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh. </li>
<li> The kafka-topics.sh script (kafka.admin.TopicCommand) now exits with non-zero exit code on failure. </li>
<li> The kafka-topics.sh script (kafka.admin.TopicCommand) will now print a warning when topic names risk metric collisions due to the use of a '.' or '_' in the topic name, and error in the case of an actual collision. </li>
<li> The kafka-console-producer.sh script (kafka.tools.ConsoleProducer) will use the Java producer instead of the old Scala producer be default, and users have to specify 'old-producer' to use the old producer. </li>
<li> Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics. </li>
<li> Altering topic configuration from the kafka-topics.sh script (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality. </li>
<li> The kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this functionality. </li>
<li> The kafka.tools.ProducerPerformance class has been deprecated. Going forward, please use org.apache.kafka.tools.ProducerPerformance for this functionality (kafka-producer-perf-test.sh will also be changed to use the new class). </li>
<li> The producer config block.on.buffer.full has been deprecated and will be removed in future release. Currently its default value has been changed to false. The KafkaProducer will no longer throw BufferExhaustedException but instead will use max.block.ms value to block, after which it will throw a TimeoutException. If block.on.buffer.full property is set to true explicitly, it will set the max.block.ms to Long.MAX_VALUE and metadata.fetch.timeout.ms will not be honoured</li>
Release 0.7 is incompatible with newer releases. Major changes were made to the API, ZooKeeper data structures, and protocol, and configuration in order to add replication (Which was missing in 0.7). The upgrade from 0.7 to later versions requires a <ahref="https://cwiki.apache.org/confluence/display/KAFKA/Migrating+from+0.7+to+0.8">special tool</a> for migration. This migration can be done without downtime.