KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4768)

Reviewers: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>
This commit is contained in:
Matthias J. Sax 2018-04-02 18:38:45 -07:00 committed by GitHub
parent 078a7acff8
commit 6a24f5b3fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1494 additions and 281 deletions

View File

@ -73,28 +73,50 @@ do
fi
done
for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
clients_lib_dir=$(dirname $0)/../clients/build/libs
streams_lib_dir=$(dirname $0)/../streams/build/libs
rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
else
clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
streams_lib_dir=$clients_lib_dir
rocksdb_lib_dir=$streams_lib_dir
fi
for file in "$clients_lib_dir"/kafka-clients*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
for file in "$streams_lib_dir"/kafka-streams*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
else
VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
fi
for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
for file in "$rocksdb_lib_dir"/rocksdb*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done

View File

@ -961,6 +961,54 @@ project(':streams:examples') {
}
}
project(':streams:upgrade-system-tests-0100') {
archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
dependencies {
testCompile libs.kafkaStreams_0100
}
systemTestLibs {
dependsOn testJar
}
}
project(':streams:upgrade-system-tests-0101') {
archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
dependencies {
testCompile libs.kafkaStreams_0101
}
systemTestLibs {
dependsOn testJar
}
}
project(':streams:upgrade-system-tests-0102') {
archivesBaseName = "kafka-streams-upgrade-system-tests-0102"
dependencies {
testCompile libs.kafkaStreams_0102
}
systemTestLibs {
dependsOn testJar
}
}
project(':streams:upgrade-system-tests-0110') {
archivesBaseName = "kafka-streams-upgrade-system-tests-0110"
dependencies {
testCompile libs.kafkaStreams_0110
}
systemTestLibs {
dependsOn testJar
}
}
project(':jmh-benchmarks') {
apply plugin: 'com.github.johnrengelman.shadow'

View File

@ -181,7 +181,7 @@
files="SmokeTestDriver.java"/>
<suppress checks="NPathComplexity"
files="KStreamKStreamJoinTest.java"/>
files="KStreamKStreamJoinTest.java|SmokeTestDriver.java"/>
<suppress checks="NPathComplexity"
files="KStreamKStreamLeftJoinTest.java"/>

View File

@ -16,7 +16,8 @@
*/
package org.apache.kafka.common.security.authenticator;
import java.util.Map;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.Mode;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
@ -25,9 +26,7 @@ import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.Mode;
import java.util.Map;
/**
* Callback handler for Sasl clients. The callbacks required for the SASL mechanism

View File

@ -34,28 +34,46 @@
</div>
<p>
If you want to upgrade from 0.11.0.x to 1.0.0 you don't need to do any code changes as the public API is fully backward compatible.
If you want to upgrade from 0.10.2.x or 0.11.0.x to 1.0.x you don't need to do any code changes as the public API is fully backward compatible.
However, some public APIs were deprecated and thus it is recommended to update your code eventually to allow for future upgrades.
See <a href="#streams_api_changes_100">below</a> a complete list of 1.0.0 API and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
See below a complete list of <a href="#streams_api_changes_100">1.0</a> and <a href="#streams_api_changes_0110">0.11.0</a> API
and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
Additionally, Streams API 1.0.x requires broker on-disk message format version 0.10 or higher; thus, you need to make sure that the message
format is configured correctly before you upgrade your Kafka Streams application.
</p>
<p>
If you want to upgrade from 0.10.2.x to 0.11.0 you don't need to do any code changes as the public API is fully backward compatible.
However, some configuration parameters were deprecated and thus it is recommended to update your code eventually to allow for future upgrades.
See <a href="#streams_api_changes_0110">below</a> a complete list of 0.11.0 API and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
If you want to upgrade from 0.10.1.x to 1.0.x see the Upgrade Sections for <a href="/{{version}}/documentation/#upgrade_1020_streams"><b>0.10.2</b></a>,
<a href="/{{version}}/documentation/#upgrade_1100_streams"><b>0.11.0</b></a>, and
<a href="/{{version}}/documentation/#upgrade_100_streams"><b>1.0</b></a>.
Note, that a brokers on-disk message format must be on version 0.10 or higher to run a Kafka Streams application version 1.0 or higher.
See below a complete list of <a href="#streams_api_changes_0102">0.10.2</a>, <a href="#streams_api_changes_0110">0.11.0</a>,
and <a href="#streams_api_changes_100">1.0</a> API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
</p>
<p>
If you want to upgrade from 0.10.1.x to 0.10.2, see the <a href="/{{version}}/documentation/#upgrade_1020_streams"><b>Upgrade Section for 0.10.2</b></a>.
It highlights incompatible changes you need to consider to upgrade your code and application.
See <a href="#streams_api_changes_0102">below</a> a complete list of 0.10.2 API and semantic changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
</p>
<p>
If you want to upgrade from 0.10.0.x to 0.10.1, see the <a href="/{{version}}/documentation/#upgrade_1010_streams"><b>Upgrade Section for 0.10.1</b></a>.
It highlights incompatible changes you need to consider to upgrade your code and application.
See <a href="#streams_api_changes_0101">below</a> a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
Upgrading from 0.10.0.x to 1.0.x directly is also possible.
Note, that a brokers must be on version 0.10.1 or higher and on-disk message format must be on version 0.10 or higher
to run a Kafka Streams application version 1.0 or higher.
See <a href="#streams_api_changes_0101">Streams API changes in 0.10.1</a>, <a href="#streams_api_changes_0102">Streams API changes in 0.10.2</a>,
<a href="#streams_api_changes_0110">Streams API changes in 0.11.0</a>, and <a href="#streams_api_changes_100">Streams API changes in 1.0</a>
for a complete list of API changes.
Upgrading to 1.0.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
As an alternative, an offline upgrade is also possible.
</p>
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 1.0.2</li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed 1.0.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
<li> bounce each instance of your application once more to complete the upgrade </li>
</ul>
<p> Upgrading from 0.10.0.x to 1.0.0 or 1.0.1 requires an offline upgrade (rolling bounce upgrade is not supported) </p>
<ul>
<li> stop all old (0.10.0.x) application instances </li>
<li> update your code and swap old code and jar file with new code and new jar file </li>
<li> restart all new (1.0.0 or 1.0.1) application instances </li>
</ul>
<h3><a id="streams_api_changes_100" href="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3>

View File

@ -129,17 +129,77 @@
be used if the SaslHandshake request version is greater than 0. </li>
</ul>
<h5><a id="upgrade_100_streams" href="#upgrade_100_streams">Upgrading a 1.0.0 Kafka Streams Application</a></h5>
<h5><a id="upgrade_100_streams" href="#upgrade_100_streams">Upgrading a 0.11.0 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.11.0 to 1.0.0 does not require a broker upgrade.
A Kafka Streams 1.0.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
However, Kafka Streams 1.0 requires 0.10 message format or newer and does not work with older message formats. </li>
A Kafka Streams 1.0.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though).
However, Kafka Streams 1.0 requires 0.10 message format or newer and does not work with older message formats. </li>
<li> If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed. </li>
<li> There are a few public APIs including <code>ProcessorContext#schedule()</code>, <code>Processor#punctuate()</code> and <code>KStreamBuilder</code>, <code>TopologyBuilder</code> are being deprecated by new APIs.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
<li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_100">Streams API changes in 1.0.0</a> for more details. </li>
</ul>
<h5><a id="upgrade_100_streams_from_0102" href="#upgrade_100_streams_from_0102">Upgrading a 0.10.2 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.2 to 1.0 does not require a broker upgrade.
A Kafka Streams 1.0 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
<li> If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed. </li>
<li> There are a few public APIs including <code>ProcessorContext#schedule()</code>, <code>Processor#punctuate()</code> and <code>KStreamBuilder</code>, <code>TopologyBuilder</code> are being deprecated by new APIs.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
<li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
<li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> for more details. </li>
</ul>
<h5><a id="upgrade_100_streams_from_0101" href="#upgrade_1100_streams_from_0101">Upgrading a 0.10.1 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.1 to 1.0 does not require a broker upgrade.
A Kafka Streams 1.0 application can connect to 1.0, 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
<li> You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> If you are monitoring on streams metrics, you will need make some changes to the metrics names in your reporting and monitoring code, because the metrics sensor hierarchy was changed. </li>
<li> There are a few public APIs including <code>ProcessorContext#schedule()</code>, <code>Processor#punctuate()</code> and <code>KStreamBuilder</code>, <code>TopologyBuilder</code> are being deprecated by new APIs.
We recommend making corresponding code changes, which should be very minor since the new APIs look quite similar, when you upgrade.
<li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
<li> If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface was changed. </li>
<li> If you register custom metrics, you will need to update this code, because the <code>StreamsMetric</code> interface was changed. </li>
<li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_100">Streams API changes in 1.0.0</a>,
<a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> and
<a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
</ul>
<h5><a id="upgrade_100_streams_from_0100" href="#upgrade_100_streams_from_0100">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.0 to 1.0 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 1.0 application can only connect to 0.1, 0.11.0, 0.10.2, or 0.10.1 brokers. </li>
<li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_100">Streams API changes in 1.0.0</a>,
<a href="/{{version}}/documentation/streams#streams_api_changes_0110">Streams API changes in 0.11.0</a>,
<a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a>, and
<a href="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> Upgrading from 0.10.0.x to 1.0.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
As an alternative, an offline upgrade is also possible.
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.11.0.3 </li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed 1.0.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
<li> bounce each instance of your application once more to complete the upgrade </li>
</ul>
</li>
<li> Upgrading from 0.10.0.x to 1.0.0 or 1.0.1 requires an offline upgrade (rolling bounce upgrade is not supported)
<ul>
<li> stop all old (0.10.0.x) application instances </li>
<li> update your code and swap old code and jar file with new code and new jar file </li>
<li> restart all new (1.0.0 or 1.0.1) application instances </li>
</ul>
</li>
</ul>
<h5><a id="upgrade_102_notable" href="#upgrade_102_notable">Notable changes in 1.0.2</a></h5>
<ul>
<li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
<li> See the <a href="/{{version}}/documentation/streams/upgrade-guide.html"><b>Kafka Streams upgrade guide</b></a> for details about this new config.
</ul>
<h4><a id="upgrade_11_0_0" href="#upgrade_11_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x or 0.10.2.x to 0.11.0.0</a></h4>
<p>Kafka 0.11.0.0 introduces a new message format version as well as wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_1100_notable">notable changes in 0.11.0.0</a> before upgrading.
@ -188,11 +248,55 @@
<h5><a id="upgrade_1100_streams" href="#upgrade_1100_streams">Upgrading a 0.10.2 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.2 to 0.11.0 does not require a broker upgrade.
A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
<li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
<li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> for more details. </li>
</ul>
<h5><a id="upgrade_1100_streams_from_0101" href="#upgrade_1100_streams_from_0101">Upgrading a 0.10.1 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.1 to 0.11.0 does not require a broker upgrade.
A Kafka Streams 0.11.0 application can connect to 0.11.0, 0.10.2 and 0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
<li> You need to recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> If you specify customized <code>key.serde</code>, <code>value.serde</code> and <code>timestamp.extractor</code> in configs, it is recommended to use their replaced configure parameter as these configs are deprecated. </li>
<li> If you use a custom (i.e., user implemented) timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface was changed. </li>
<li> If you register custom metrics, you will need to update this code, because the <code>StreamsMetric</code> interface was changed. </li>
<li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0110">Streams API changes in 0.11.0</a> and
<a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
</ul>
<h5><a id="upgrade_1100_streams_from_0100" href="#upgrade_1100_streams_from_0100">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.0 to 0.11.0 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.11.0 application can only connect to 0.11.0, 0.10.2, or 0.10.1 brokers. </li>
<li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams#streams_api_changes_0110">Streams API changes in 0.11.0</a>,
<a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a>, and
<a href="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> Upgrading from 0.10.0.x to 0.11.0.3 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
As an alternative, an offline upgrade is also possible.
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.11.0.3 </li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed 0.11.0.3 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
<li> bounce each instance of your application once more to complete the upgrade </li>
</ul>
</li>
<li> Upgrading from 0.10.0.x to 0.11.0.0, 0.11.0.1, or 0.11.0.2 requires an offline upgrade (rolling bounce upgrade is not supported)
<ul>
<li> stop all old (0.10.0.x) application instances </li>
<li> update your code and swap old code and jar file with new code and new jar file </li>
<li> restart all new (0.11.0.0 , 0.11.0.1, or 0.11.0.2) application instances </li>
</ul>
</li>
</ul>
<h5><a id="upgrade_1103_notable" href="#upgrade_1103_notable">Notable changes in 0.11.0.3</a></h5>
<ul>
<li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
<li> See the <a href="/{{version}}/documentation/streams/upgrade-guide.html"><b>Kafka Streams upgrade guide</b></a> for details about this new config.
</ul>
<h5><a id="upgrade_1100_notable" href="#upgrade_1100_notable">Notable changes in 0.11.0.0</a></h5>
<ul>
<li>Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to
@ -343,6 +447,35 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.
<li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
</ul>
<h5><a id="upgrade_1020_streams_from_0100" href="#upgrade_1020_streams_from_0100">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
<ul>
<li> Upgrading your Streams application from 0.10.0 to 0.10.2 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.2 application can only connect to 0.10.2 or 0.10.1 brokers. </li>
<li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> Upgrading from 0.10.0.x to 0.10.2.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
As an alternative, an offline upgrade is also possible.
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.2.2 </li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed 0.10.2.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
<li> bounce each instance of your application once more to complete the upgrade </li>
</ul>
</li>
<li> Upgrading from 0.10.0.x to 0.10.2.0 or 0.10.2.1 requires an offline upgrade (rolling bounce upgrade is not supported)
<ul>
<li> stop all old (0.10.0.x) application instances </li>
<li> update your code and swap old code and jar file with new code and new jar file </li>
<li> restart all new (0.10.2.0 or 0.10.2.1) application instances </li>
</ul>
</li>
</ul>
<h5><a id="upgrade_10202_notable" href="#upgrade_10202_notable">Notable changes in 0.10.2.2</a></h5>
<ul>
<li> New configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
</ul>
<h5><a id="upgrade_10201_notable" href="#upgrade_10201_notable">Notable changes in 0.10.2.1</a></h5>
<ul>
<li> The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. The internal Kafka Streams producer <code>retries</code> default value was changed from 0 to 10. The internal Kafka Streams consumer <code>max.poll.interval.ms</code> default value was changed from 300000 to <code>Integer.MAX_VALUE</code>.
@ -421,6 +554,23 @@ only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older
<li> Upgrading your Streams application from 0.10.0 to 0.10.1 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.1 application can only connect to 0.10.1 brokers. </li>
<li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
<li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
As an alternative, an offline upgrade is also possible.
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.1.2 </li>
<li> bounce each instance of your application once </li>
<li> prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
<li> bounce each instance of your application once more to complete the upgrade </li>
</ul>
</li>
<li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline upgrade (rolling bounce upgrade is not supported)
<ul>
<li> stop all old (0.10.0.x) application instances </li>
<li> update your code and swap old code and jar file with new code and new jar file </li>
<li> restart all new (0.10.1.0 or 0.10.1.1) application instances </li>
</ul>
</li>
</ul>
<h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5>

View File

@ -59,6 +59,10 @@ versions += [
log4j: "1.2.17",
jopt: "5.0.4",
junit: "4.12",
kafka_0100: "0.10.0.1",
kafka_0101: "0.10.1.1",
kafka_0102: "0.10.2.1",
kafka_0110: "0.11.0.2",
lz4: "1.4",
metrics: "2.2.0",
// PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
@ -95,11 +99,15 @@ libs += [
jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",
jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh",
jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh",
junit: "junit:junit:$versions.junit",
log4j: "log4j:log4j:$versions.log4j",
jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
junit: "junit:junit:$versions.junit",
kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110",
log4j: "log4j:log4j:$versions.log4j",
lz4: "org.lz4:lz4-java:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock",

View File

@ -13,5 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'streams:upgrade-system-tests-0100',
'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', 'streams:upgrade-system-tests-0110',
'log4j-appender',
'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks'

View File

@ -129,6 +129,11 @@ public class StreamsConfig extends AbstractConfig {
*/
public static final String PRODUCER_PREFIX = "producer.";
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
*/
public static final String UPGRADE_FROM_0100 = "0.10.0";
/**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
*/
@ -280,6 +285,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface. This config is deprecated, use <code>" + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "</code> instead";
/** {@code upgrade.from} */
public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
"Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x).";
/**
* {@code value.serde}
* @deprecated Use {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG} instead.
@ -491,6 +501,12 @@ public class StreamsConfig extends AbstractConfig {
10 * 60 * 1000,
Importance.LOW,
STATE_CLEANUP_DELAY_MS_DOC)
.define(UPGRADE_FROM_CONFIG,
ConfigDef.Type.STRING,
null,
in(null, UPGRADE_FROM_0100),
Importance.LOW,
UPGRADE_FROM_DOC)
.define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
Type.LONG,
24 * 60 * 60 * 1000,
@ -712,6 +728,7 @@ public class StreamsConfig extends AbstractConfig {
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
// add configs required for stream partition assignor
consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));

View File

@ -175,6 +175,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
private String userEndPoint;
private int numStandbyReplicas;
private int userMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
private Cluster metadataWithInternalTopics;
private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
@ -205,7 +207,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
// Setting the logger with the passed in client thread name
logPrefix = String.format("stream-thread [%s] ", configs.get(CommonClientConfigs.CLIENT_ID_CONFIG));
final LogContext logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());
log = logContext.logger(getClass());
final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x.");
userMetadataVersion = 1;
}
Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
if (o == null) {
@ -266,7 +274,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
final Set<TaskId> previousActiveTasks = threadDataProvider.prevActiveTasks();
Set<TaskId> standbyTasks = threadDataProvider.cachedTasks();
standbyTasks.removeAll(previousActiveTasks);
SubscriptionInfo data = new SubscriptionInfo(threadDataProvider.processId(), previousActiveTasks, standbyTasks, this.userEndPoint);
SubscriptionInfo data = new SubscriptionInfo(userMetadataVersion, threadDataProvider.processId(), previousActiveTasks, standbyTasks, this.userEndPoint);
if (threadDataProvider.builder().sourceTopicPattern() != null &&
!threadDataProvider.builder().subscriptionUpdates().getUpdates().equals(topics)) {
@ -309,11 +317,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
// construct the client metadata from the decoded subscription info
Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
int minUserMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
String consumerId = entry.getKey();
Subscription subscription = entry.getValue();
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
final int usedVersion = info.version;
if (usedVersion < minUserMetadataVersion) {
minUserMetadataVersion = usedVersion;
}
// create the new client metadata if necessary
ClientMetadata clientMetadata = clientsMetadata.get(info.processId);
@ -572,7 +585,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
}
// finally, encode the assignment before sending back to coordinator
assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode()));
}
}

View File

@ -55,7 +55,7 @@ public class AssignmentInfo {
this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
}
protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
public AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
Map<HostInfo, Set<TopicPartition>> hostState) {
this.version = version;
this.activeTasks = activeTasks;
@ -153,8 +153,7 @@ public class AssignmentInfo {
}
}
return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
return new AssignmentInfo(version, activeTasks, standbyTasks, hostStateToTopicPartitions);
} catch (IOException ex) {
throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
}

View File

@ -31,7 +31,7 @@ public class SubscriptionInfo {
private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
private static final int CURRENT_VERSION = 2;
public static final int CURRENT_VERSION = 2;
public final int version;
public final UUID processId;
@ -43,7 +43,7 @@ public class SubscriptionInfo {
this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
}
private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
public SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
this.version = version;
this.processId = processId;
this.prevTasks = prevTasks;

View File

@ -423,6 +423,7 @@ public class StreamsConfigTest {
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(commitIntervalMs));
}
@SuppressWarnings("deprecation")
@Test
public void shouldBeBackwardsCompatibleWithDeprecatedConfigs() {
final Properties props = minimalStreamsConfig();
@ -457,6 +458,7 @@ public class StreamsConfigTest {
assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
}
@SuppressWarnings("deprecation")
@Test
public void shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() {
final Properties props = minimalStreamsConfig();
@ -470,6 +472,7 @@ public class StreamsConfigTest {
}
}
@SuppressWarnings("deprecation")
@Test
public void shouldSpecifyCorrectKeySerdeClassOnError() {
final Properties props = minimalStreamsConfig();
@ -483,6 +486,7 @@ public class StreamsConfigTest {
}
}
@SuppressWarnings("deprecation")
@Test
public void shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() {
final Properties props = minimalStreamsConfig();
@ -496,6 +500,7 @@ public class StreamsConfigTest {
}
}
@SuppressWarnings("deprecation")
@Test
public void shouldSpecifyCorrectValueSerdeClassOnError() {
final Properties props = minimalStreamsConfig();
@ -518,9 +523,7 @@ public class StreamsConfigTest {
}
@Override
public void close() {
}
public void close() {}
@Override
public Serializer serializer() {

View File

@ -313,6 +313,4 @@ public class KStreamAggregationDedupIntegrationTest {
}
}

View File

@ -140,7 +140,7 @@ public class StreamPartitionAssignorTest {
final Set<TaskId> cachedTasks,
final UUID processId,
final PartitionGrouper partitionGrouper,
final InternalTopologyBuilder builder) throws NoSuchFieldException, IllegalAccessException {
final InternalTopologyBuilder builder) {
EasyMock.expect(threadDataProvider.name()).andReturn("name").anyTimes();
EasyMock.expect(threadDataProvider.prevActiveTasks()).andReturn(prevTasks).anyTimes();
EasyMock.expect(threadDataProvider.cachedTasks()).andReturn(cachedTasks).anyTimes();
@ -179,7 +179,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void testSubscription() throws Exception {
public void testSubscription() {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@ -208,7 +208,7 @@ public class StreamPartitionAssignorTest {
@Test
public void testAssignBasic() throws Exception {
public void testAssignBasic() {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@ -248,11 +248,9 @@ public class StreamPartitionAssignorTest {
// check assignment info
Set<TaskId> allActiveTasks = new HashSet<>();
// the first consumer
AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
allActiveTasks.addAll(info10.activeTasks);
Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
// the second consumer
AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
@ -272,7 +270,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() throws Exception {
public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1");
@ -340,7 +338,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void testAssignWithPartialTopology() throws Exception {
public void testAssignWithPartialTopology() {
Properties props = configProps();
props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class);
StreamsConfig config = new StreamsConfig(props);
@ -369,9 +367,8 @@ public class StreamPartitionAssignorTest {
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
// check assignment info
Set<TaskId> allActiveTasks = new HashSet<>();
AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
allActiveTasks.addAll(info10.activeTasks);
Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
@ -379,7 +376,7 @@ public class StreamPartitionAssignorTest {
@Test
public void testAssignEmptyMetadata() throws Exception {
public void testAssignEmptyMetadata() {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@ -409,9 +406,8 @@ public class StreamPartitionAssignorTest {
new HashSet<>(assignments.get("consumer10").partitions()));
// check assignment info
Set<TaskId> allActiveTasks = new HashSet<>();
AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
allActiveTasks.addAll(info10.activeTasks);
Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
assertEquals(0, allActiveTasks.size());
assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks));
@ -434,7 +430,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void testAssignWithNewTasks() throws Exception {
public void testAssignWithNewTasks() {
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addSource(null, "source3", null, null, null, "topic3");
@ -466,13 +462,9 @@ public class StreamPartitionAssignorTest {
// check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
// also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
// then later ones will be re-assigned to other hosts due to load balancing
Set<TaskId> allActiveTasks = new HashSet<>();
Set<TopicPartition> allPartitions = new HashSet<>();
AssignmentInfo info;
info = AssignmentInfo.decode(assignments.get("consumer10").userData());
allActiveTasks.addAll(info.activeTasks);
allPartitions.addAll(assignments.get("consumer10").partitions());
AssignmentInfo info = AssignmentInfo.decode(assignments.get("consumer10").userData());
Set<TaskId> allActiveTasks = new HashSet<>(info.activeTasks);
Set<TopicPartition> allPartitions = new HashSet<>(assignments.get("consumer10").partitions());
info = AssignmentInfo.decode(assignments.get("consumer11").userData());
allActiveTasks.addAll(info.activeTasks);
@ -487,7 +479,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void testAssignWithStates() throws Exception {
public void testAssignWithStates() {
String applicationId = "test";
builder.setApplicationId(applicationId);
builder.addSource(null, "source1", null, null, null, "topic1");
@ -576,7 +568,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void testAssignWithStandbyReplicas() throws Exception {
public void testAssignWithStandbyReplicas() {
Properties props = configProps();
props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
StreamsConfig config = new StreamsConfig(props);
@ -613,13 +605,10 @@ public class StreamPartitionAssignorTest {
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
Set<TaskId> allActiveTasks = new HashSet<>();
Set<TaskId> allStandbyTasks = new HashSet<>();
// the first consumer
AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
allActiveTasks.addAll(info10.activeTasks);
allStandbyTasks.addAll(info10.standbyTasks.keySet());
Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
Set<TaskId> allStandbyTasks = new HashSet<>(info10.standbyTasks.keySet());
// the second consumer
AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
@ -647,7 +636,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void testOnAssignment() throws Exception {
public void testOnAssignment() {
TopicPartition t2p3 = new TopicPartition("topic2", 3);
builder.addSource(null, "source1", null, null, null, "topic1");
@ -675,7 +664,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void testAssignWithInternalTopics() throws Exception {
public void testAssignWithInternalTopics() {
String applicationId = "test";
builder.setApplicationId(applicationId);
builder.addInternalTopic("topicX");
@ -706,7 +695,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
String applicationId = "test";
builder.setApplicationId(applicationId);
builder.addInternalTopic("topicX");
@ -741,9 +730,8 @@ public class StreamPartitionAssignorTest {
}
@Test
public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
final String applicationId = "application-id";
builder.setApplicationId(applicationId);
public void shouldAddUserDefinedEndPointToSubscription() {
builder.setApplicationId("application-id");
builder.addSource(null, "source", null, null, null, "input");
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
builder.addSink("sink", "output", null, null, null, "processor");
@ -752,7 +740,8 @@ public class StreamPartitionAssignorTest {
mockThreadDataProvider(Collections.<TaskId>emptySet(),
Collections.<TaskId>emptySet(),
uuid1,
defaultPartitionGrouper, builder);
defaultPartitionGrouper,
builder);
configurePartitionAssignor(0, userEndPoint);
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
@ -760,7 +749,59 @@ public class StreamPartitionAssignorTest {
}
@Test
public void shouldMapUserEndPointToTopicPartitions() throws Exception {
public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
final Set<TaskId> emptyTasks = Collections.emptySet();
subscriptions.put(
"consumer1",
new PartitionAssignor.Subscription(
Collections.singletonList("topic1"),
new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
)
);
subscriptions.put(
"consumer2",
new PartitionAssignor.Subscription(
Collections.singletonList("topic1"),
new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
)
);
mockThreadDataProvider(
emptyTasks,
emptyTasks,
UUID.randomUUID(),
defaultPartitionGrouper,
builder);
configurePartitionAssignor(0, null);
final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
assertEquals(2, assignment.size());
assertEquals(1, AssignmentInfo.decode(assignment.get("consumer1").userData()).version);
assertEquals(1, AssignmentInfo.decode(assignment.get("consumer2").userData()).version);
}
@Test
public void shouldDownGradeSubscription() {
final Set<TaskId> emptyTasks = Collections.emptySet();
mockThreadDataProvider(
emptyTasks,
emptyTasks,
UUID.randomUUID(),
defaultPartitionGrouper,
builder);
configurationMap.put(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100);
configurePartitionAssignor(0, null);
PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
assertEquals(1, SubscriptionInfo.decode(subscription.userData()).version);
}
@Test
public void shouldMapUserEndPointToTopicPartitions() {
final String applicationId = "application-id";
builder.setApplicationId(applicationId);
builder.addSource(null, "source", null, null, null, "topic1");
@ -790,7 +831,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
final String myEndPoint = "localhost";
final String applicationId = "application-id";
builder.setApplicationId(applicationId);
@ -821,7 +862,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
public void shouldExposeHostStateToTopicPartitionsOnAssignment() {
List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
final Map<HostInfo, Set<TopicPartition>> hostState =
Collections.singletonMap(new HostInfo("localhost", 80),
@ -837,7 +878,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void shouldSetClusterMetadataOnAssignment() throws Exception {
public void shouldSetClusterMetadataOnAssignment() {
final List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
final Map<HostInfo, Set<TopicPartition>> hostState =
Collections.singletonMap(new HostInfo("localhost", 80),
@ -865,7 +906,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() throws Exception {
public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
final String applicationId = "application-id";
final StreamsBuilder builder = new StreamsBuilder();
@ -970,7 +1011,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception {
public void shouldUpdatePartitionHostInfoMapOnAssignment() {
final TopicPartition partitionOne = new TopicPartition("topic", 1);
final TopicPartition partitionTwo = new TopicPartition("topic", 2);
final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(
@ -993,7 +1034,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void shouldUpdateClusterMetadataOnAssignment() throws Exception {
public void shouldUpdateClusterMetadataOnAssignment() {
final TopicPartition topicOne = new TopicPartition("topic", 1);
final TopicPartition topicTwo = new TopicPartition("topic2", 2);
final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(
@ -1015,7 +1056,7 @@ public class StreamPartitionAssignorTest {
}
@Test
public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
final String applicationId = "appId";
final StreamsBuilder builder = new StreamsBuilder();

View File

@ -64,10 +64,9 @@ public class AssignmentInfoTest {
assertEquals(oldVersion.activeTasks, decoded.activeTasks);
assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1
assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
assertEquals(1, decoded.version);
}
/**
* This is a clone of what the V1 encoding did. The encode method has changed for V2
* so it is impossible to test compatibility without having this

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@ -30,10 +31,13 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import java.io.File;
import java.util.Properties;
@ -47,7 +51,7 @@ public class SmokeTestClient extends SmokeTestUtil {
private Thread thread;
private boolean uncaughtException = false;
public SmokeTestClient(File stateDir, String kafka) {
public SmokeTestClient(final File stateDir, final String kafka) {
super();
this.stateDir = stateDir;
this.kafka = kafka;
@ -57,7 +61,7 @@ public class SmokeTestClient extends SmokeTestUtil {
streams = createKafkaStreams(stateDir, kafka);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
public void uncaughtException(final Thread t, final Throwable e) {
System.out.println("SMOKE-TEST-CLIENT-EXCEPTION");
uncaughtException = true;
e.printStackTrace();
@ -94,7 +98,7 @@ public class SmokeTestClient extends SmokeTestUtil {
}
}
private static KafkaStreams createKafkaStreams(File stateDir, String kafka) {
private static Properties getStreamsConfig(final File stateDir, final String kafka) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@ -109,25 +113,29 @@ public class SmokeTestClient extends SmokeTestUtil {
props.put(ProducerConfig.ACKS_CONFIG, "all");
//TODO remove this config or set to smaller value when KIP-91 is merged
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
return props;
}
StreamsBuilder builder = new StreamsBuilder();
Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
source.to(stringSerde, intSerde, "echo");
KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
private static KafkaStreams createKafkaStreams(final File stateDir, final String kafka) {
final StreamsBuilder builder = new StreamsBuilder();
final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
final KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
source.to("echo", Produced.with(stringSerde, intSerde));
final KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
public boolean test(final String key, final Integer value) {
return value == null || value != END;
}
});
data.process(SmokeTestUtil.printProcessorSupplier("data"));
// min
KGroupedStream<String, Integer>
groupedData =
final KGroupedStream<String, Integer> groupedData =
data.groupByKey(Serialized.with(stringSerde, intSerde));
groupedData.aggregate(
groupedData
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1)))
.aggregate(
new Initializer<Integer>() {
public Integer apply() {
return Integer.MAX_VALUE;
@ -135,21 +143,24 @@ public class SmokeTestClient extends SmokeTestUtil {
},
new Aggregator<String, Integer, Integer>() {
@Override
public Integer apply(String aggKey, Integer value, Integer aggregate) {
public Integer apply(final String aggKey, final Integer value, final Integer aggregate) {
return (value < aggregate) ? value : aggregate;
}
},
TimeWindows.of(TimeUnit.DAYS.toMillis(1)),
intSerde, "uwin-min"
).toStream().map(
new Unwindow<String, Integer>()
).to(stringSerde, intSerde, "min");
Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min").withValueSerde(intSerde))
.toStream(new Unwindow<String, Integer>())
.to("min", Produced.with(stringSerde, intSerde));
KTable<String, Integer> minTable = builder.table("min", stringIntConsumed);
final KTable<String, Integer> minTable = builder.table(
"min",
Consumed.with(stringSerde, intSerde),
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("minStoreName"));
minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"));
// max
groupedData.aggregate(
groupedData
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(2)))
.aggregate(
new Initializer<Integer>() {
public Integer apply() {
return Integer.MIN_VALUE;
@ -157,21 +168,24 @@ public class SmokeTestClient extends SmokeTestUtil {
},
new Aggregator<String, Integer, Integer>() {
@Override
public Integer apply(String aggKey, Integer value, Integer aggregate) {
public Integer apply(final String aggKey, final Integer value, final Integer aggregate) {
return (value > aggregate) ? value : aggregate;
}
},
TimeWindows.of(TimeUnit.DAYS.toMillis(2)),
intSerde, "uwin-max"
).toStream().map(
new Unwindow<String, Integer>()
).to(stringSerde, intSerde, "max");
Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-max").withValueSerde(intSerde))
.toStream(new Unwindow<String, Integer>())
.to("max", Produced.with(stringSerde, intSerde));
KTable<String, Integer> maxTable = builder.table("max", stringIntConsumed);
final KTable<String, Integer> maxTable = builder.table(
"max",
Consumed.with(stringSerde, intSerde),
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("maxStoreName"));
maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"));
// sum
groupedData.aggregate(
groupedData
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(2)))
.aggregate(
new Initializer<Long>() {
public Long apply() {
return 0L;
@ -179,70 +193,74 @@ public class SmokeTestClient extends SmokeTestUtil {
},
new Aggregator<String, Integer, Long>() {
@Override
public Long apply(String aggKey, Integer value, Long aggregate) {
public Long apply(final String aggKey, final Integer value, final Long aggregate) {
return (long) value + aggregate;
}
},
TimeWindows.of(TimeUnit.DAYS.toMillis(2)),
longSerde, "win-sum"
).toStream().map(
new Unwindow<String, Long>()
).to(stringSerde, longSerde, "sum");
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("win-sum").withValueSerde(longSerde))
.toStream(new Unwindow<String, Long>())
.to("sum", Produced.with(stringSerde, longSerde));
Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
final Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
final KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
// cnt
groupedData.count(TimeWindows.of(TimeUnit.DAYS.toMillis(2)), "uwin-cnt")
.toStream().map(
new Unwindow<String, Long>()
).to(stringSerde, longSerde, "cnt");
KTable<String, Long> cntTable = builder.table("cnt", stringLongConsumed);
// cnt
groupedData
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(2)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("uwin-cnt"))
.toStream(new Unwindow<String, Long>())
.to("cnt", Produced.with(stringSerde, longSerde));
final KTable<String, Long> cntTable = builder.table(
"cnt",
Consumed.with(stringSerde, longSerde),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("cntStoreName"));
cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"));
// dif
maxTable.join(minTable,
maxTable
.join(
minTable,
new ValueJoiner<Integer, Integer, Integer>() {
public Integer apply(Integer value1, Integer value2) {
public Integer apply(final Integer value1, final Integer value2) {
return value1 - value2;
}
}
).to(stringSerde, intSerde, "dif");
})
.toStream()
.to("dif", Produced.with(stringSerde, intSerde));
// avg
sumTable.join(
sumTable
.join(
cntTable,
new ValueJoiner<Long, Long, Double>() {
public Double apply(Long value1, Long value2) {
public Double apply(final Long value1, final Long value2) {
return (double) value1 / (double) value2;
}
}
).to(stringSerde, doubleSerde, "avg");
})
.toStream()
.to("avg", Produced.with(stringSerde, doubleSerde));
// test repartition
Agg agg = new Agg();
cntTable.groupBy(agg.selector(),
Serialized.with(stringSerde, longSerde)
).aggregate(agg.init(),
agg.adder(),
agg.remover(),
Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
).to(stringSerde, longSerde, "tagg");
final Agg agg = new Agg();
cntTable.groupBy(agg.selector(), Serialized.with(stringSerde, longSerde))
.aggregate(agg.init(), agg.adder(), agg.remover(),
Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream()
.to("tagg", Produced.with(stringSerde, longSerde));
final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
final KafkaStreams streamsClient = new KafkaStreams(builder.build(), getStreamsConfig(stateDir, kafka));
streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
public void uncaughtException(final Thread t, final Throwable e) {
System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
streamsClient.close(30, TimeUnit.SECONDS);
}
});
return streamsClient;
}
}

View File

@ -130,53 +130,65 @@ public class SmokeTestDriver extends SmokeTestUtil {
System.out.println("shutdown");
}
public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) {
public static Map<String, Set<Integer>> generate(final String kafka,
final int numKeys,
final int maxRecordsPerKey) {
return generate(kafka, numKeys, maxRecordsPerKey, true);
}
public static Map<String, Set<Integer>> generate(final String kafka,
final int numKeys,
final int maxRecordsPerKey,
final boolean autoTerminate) {
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
// the next 4 config values make sure that all records are produced with no loss and
// no duplicates
// the next 2 config values make sure that all records are produced with no loss and no duplicates
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 45000);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
int numRecordsProduced = 0;
Map<String, Set<Integer>> allData = new HashMap<>();
ValueList[] data = new ValueList[numKeys];
final Map<String, Set<Integer>> allData = new HashMap<>();
final ValueList[] data = new ValueList[numKeys];
for (int i = 0; i < numKeys; i++) {
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
allData.put(data[i].key, new HashSet<Integer>());
}
Random rand = new Random();
final Random rand = new Random();
int remaining = data.length;
int remaining = 1; // dummy value must be positive if <autoTerminate> is false
if (autoTerminate) {
remaining = data.length;
}
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
while (remaining > 0) {
int index = rand.nextInt(remaining);
String key = data[index].key;
final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys);
final String key = data[index].key;
int value = data[index].next();
if (value < 0) {
if (autoTerminate && value < 0) {
remaining--;
data[index] = data[remaining];
} else {
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
producer.send(record, new TestCallback(record, needRetry));
numRecordsProduced++;
allData.get(key).add(value);
if (numRecordsProduced % 100 == 0)
if (numRecordsProduced % 100 == 0) {
System.out.println(numRecordsProduced + " records produced");
}
Utils.sleep(2);
}
}

View File

@ -44,20 +44,15 @@ public class SmokeTestUtil {
public Processor<Object, Object> get() {
return new AbstractProcessor<Object, Object>() {
private int numRecordsProcessed = 0;
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
this.context = context;
}
@Override
public void process(final Object key, final Object value) {
if (printOffset) {
System.out.println(">>> " + context.offset());
}
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println(System.currentTimeMillis());
@ -66,19 +61,19 @@ public class SmokeTestUtil {
}
@Override
public void punctuate(final long timestamp) { }
public void punctuate(final long timestamp) {}
@Override
public void close() { }
public void close() {}
};
}
};
}
public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, KeyValue<K, V>> {
public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, K> {
@Override
public KeyValue<K, V> apply(final Windowed<K> winKey, final V value) {
return new KeyValue<>(winKey.key(), value);
public K apply(final Windowed<K> winKey, final V value) {
return winKey.key();
}
}

View File

@ -23,7 +23,7 @@ import java.util.Set;
public class StreamsSmokeTest {
/**
* args ::= command kafka zookeeper stateDir
* args ::= command kafka zookeeper stateDir disableAutoTerminate
* command := "run" | "process"
*
* @param args
@ -32,11 +32,13 @@ public class StreamsSmokeTest {
String kafka = args[0];
String stateDir = args.length > 1 ? args[1] : null;
String command = args.length > 2 ? args[2] : null;
boolean disableAutoTerminate = args.length > 3;
System.out.println("StreamsTest instance started");
System.out.println("StreamsTest instance started (StreamsSmokeTest)");
System.out.println("command=" + command);
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
System.out.println("disableAutoTerminate=" + disableAutoTerminate);
switch (command) {
case "standalone":
@ -46,8 +48,12 @@ public class StreamsSmokeTest {
// this starts the driver (data generation and result verification)
final int numKeys = 10;
final int maxRecordsPerKey = 500;
Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
if (disableAutoTerminate) {
SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false);
} else {
Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
}
break;
case "process":
// this starts a KafkaStreams client

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];
final String stateDir = args[1];
final String upgradeFrom = args.length > 2 ? args[2] : null;
System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
System.out.println("upgradeFrom=" + upgradeFrom);
final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data");
dataStream.process(SmokeTestUtil.printProcessorSupplier("data"));
dataStream.to("echo");
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
if (upgradeFrom != null) {
config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
}
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}
});
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Properties;
public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) {
if (args.length < 3) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : "")
+ (args.length > 1 ? args[1] : ""));
}
final String kafka = args[0];
final String zookeeper = args[1];
final String stateDir = args[2];
System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)");
System.out.println("kafka=" + kafka);
System.out.println("zookeeper=" + zookeeper);
System.out.println("stateDir=" + stateDir);
final KStreamBuilder builder = new KStreamBuilder();
final KStream dataStream = builder.stream("data");
dataStream.process(printProcessorSupplier());
dataStream.to("echo");
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}
});
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
return new ProcessorSupplier<K, V>() {
public Processor<K, V> get() {
return new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@Override
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data");
}
}
@Override
public void punctuate(final long timestamp) {}
@Override
public void close() {}
};
}
};
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Properties;
public class StreamsUpgradeTest {
/**
* This test cannot be run executed, as long as Kafka 0.10.1.2 is not released
*/
@SuppressWarnings("unchecked")
public static void main(final String[] args) {
if (args.length < 3) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : "")
+ (args.length > 1 ? args[1] : ""));
}
final String kafka = args[0];
final String zookeeper = args[1];
final String stateDir = args[2];
final String upgradeFrom = args.length > 3 ? args[3] : null;
System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.1)");
System.out.println("kafka=" + kafka);
System.out.println("zookeeper=" + zookeeper);
System.out.println("stateDir=" + stateDir);
System.out.println("upgradeFrom=" + upgradeFrom);
final KStreamBuilder builder = new KStreamBuilder();
final KStream dataStream = builder.stream("data");
dataStream.process(printProcessorSupplier());
dataStream.to("echo");
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
if (upgradeFrom != null) {
// TODO: because Kafka 0.10.1.2 is not released yet, thus `UPGRADE_FROM_CONFIG` is not available yet
//config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
config.setProperty("upgrade.from", upgradeFrom);
}
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}
});
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
return new ProcessorSupplier<K, V>() {
public Processor<K, V> get() {
return new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@Override
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data");
}
}
@Override
public void punctuate(final long timestamp) {}
@Override
public void close() {}
};
}
};
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Properties;
public class StreamsUpgradeTest {
/**
* This test cannot be run executed, as long as Kafka 0.10.2.2 is not released
*/
@SuppressWarnings("unchecked")
public static void main(final String[] args) {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];
final String stateDir = args[1];
final String upgradeFrom = args.length > 2 ? args[2] : null;
System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.2)");
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
System.out.println("upgradeFrom=" + upgradeFrom);
final KStreamBuilder builder = new KStreamBuilder();
final KStream dataStream = builder.stream("data");
dataStream.process(printProcessorSupplier());
dataStream.to("echo");
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
if (upgradeFrom != null) {
// TODO: because Kafka 0.10.2.2 is not released yet, thus `UPGRADE_FROM_CONFIG` is not available yet
//config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
config.setProperty("upgrade.from", upgradeFrom);
}
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}
});
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
return new ProcessorSupplier<K, V>() {
public Processor<K, V> get() {
return new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@Override
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data");
}
}
@Override
public void punctuate(final long timestamp) {}
@Override
public void close() {}
};
}
};
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Properties;
public class StreamsUpgradeTest {
/**
* This test cannot be run executed, as long as Kafka 0.11.0.3 is not released
*/
@SuppressWarnings("unchecked")
public static void main(final String[] args) {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];
final String stateDir = args[1];
final String upgradeFrom = args.length > 2 ? args[2] : null;
System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.11.0)");
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
System.out.println("upgradeFrom=" + upgradeFrom);
final KStreamBuilder builder = new KStreamBuilder();
final KStream dataStream = builder.stream("data");
dataStream.process(printProcessorSupplier());
dataStream.to("echo");
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
if (upgradeFrom != null) {
// TODO: because Kafka 0.11.0.3 is not released yet, thus `UPGRADE_FROM_CONFIG` is not available yet
//config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
config.setProperty("upgrade.from", upgradeFrom);
}
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}
});
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
return new ProcessorSupplier<K, V>() {
public Processor<K, V> get() {
return new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@Override
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data");
}
}
@Override
public void punctuate(final long timestamp) {}
@Override
public void close() {}
};
}
};
}
}

View File

@ -40,13 +40,13 @@ COPY ./ssh-config /root/.ssh/config
RUN ssh-keygen -q -t rsa -N '' -f /root/.ssh/id_rsa && cp -f /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
# Install binary test dependencies.
ENV MIRROR="http://mirrors.ocf.berkeley.edu/apache/"
RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s "${MIRROR}kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "${MIRROR}kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "${MIRROR}kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "${MIRROR}kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
ENV MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s "${MIRROR}/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "${MIRROR}/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "${MIRROR}/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
RUN mkdir -p "/opt/kafka-0.11.0.2" && curl -s "${MIRROR}/kafka_2.11-0.11.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.2"
# Set up the ducker user.
RUN useradd -ms /bin/bash ducker && mkdir -p /home/ducker/ && rsync -aiq /root/.ssh/ /home/ducker/.ssh && chown -R ducker /home/ducker/ /mnt/ && echo 'ducker ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers

View File

@ -20,6 +20,7 @@ from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
class StreamsTestBaseService(KafkaPathResolverMixin, Service):
@ -33,6 +34,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
CLEAN_NODE_ENABLED = True
logs = {
"streams_log": {
"path": LOG_FILE,
@ -43,6 +46,114 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
"streams_stderr": {
"path": STDERR_FILE,
"collect_default": True},
"streams_log.0-1": {
"path": LOG_FILE + ".0-1",
"collect_default": True},
"streams_stdout.0-1": {
"path": STDOUT_FILE + ".0-1",
"collect_default": True},
"streams_stderr.0-1": {
"path": STDERR_FILE + ".0-1",
"collect_default": True},
"streams_log.0-2": {
"path": LOG_FILE + ".0-2",
"collect_default": True},
"streams_stdout.0-2": {
"path": STDOUT_FILE + ".0-2",
"collect_default": True},
"streams_stderr.0-2": {
"path": STDERR_FILE + ".0-2",
"collect_default": True},
"streams_log.0-3": {
"path": LOG_FILE + ".0-3",
"collect_default": True},
"streams_stdout.0-3": {
"path": STDOUT_FILE + ".0-3",
"collect_default": True},
"streams_stderr.0-3": {
"path": STDERR_FILE + ".0-3",
"collect_default": True},
"streams_log.0-4": {
"path": LOG_FILE + ".0-4",
"collect_default": True},
"streams_stdout.0-4": {
"path": STDOUT_FILE + ".0-4",
"collect_default": True},
"streams_stderr.0-4": {
"path": STDERR_FILE + ".0-4",
"collect_default": True},
"streams_log.0-5": {
"path": LOG_FILE + ".0-5",
"collect_default": True},
"streams_stdout.0-5": {
"path": STDOUT_FILE + ".0-5",
"collect_default": True},
"streams_stderr.0-5": {
"path": STDERR_FILE + ".0-5",
"collect_default": True},
"streams_log.0-6": {
"path": LOG_FILE + ".0-6",
"collect_default": True},
"streams_stdout.0-6": {
"path": STDOUT_FILE + ".0-6",
"collect_default": True},
"streams_stderr.0-6": {
"path": STDERR_FILE + ".0-6",
"collect_default": True},
"streams_log.1-1": {
"path": LOG_FILE + ".1-1",
"collect_default": True},
"streams_stdout.1-1": {
"path": STDOUT_FILE + ".1-1",
"collect_default": True},
"streams_stderr.1-1": {
"path": STDERR_FILE + ".1-1",
"collect_default": True},
"streams_log.1-2": {
"path": LOG_FILE + ".1-2",
"collect_default": True},
"streams_stdout.1-2": {
"path": STDOUT_FILE + ".1-2",
"collect_default": True},
"streams_stderr.1-2": {
"path": STDERR_FILE + ".1-2",
"collect_default": True},
"streams_log.1-3": {
"path": LOG_FILE + ".1-3",
"collect_default": True},
"streams_stdout.1-3": {
"path": STDOUT_FILE + ".1-3",
"collect_default": True},
"streams_stderr.1-3": {
"path": STDERR_FILE + ".1-3",
"collect_default": True},
"streams_log.1-4": {
"path": LOG_FILE + ".1-4",
"collect_default": True},
"streams_stdout.1-4": {
"path": STDOUT_FILE + ".1-4",
"collect_default": True},
"streams_stderr.1-4": {
"path": STDERR_FILE + ".1-4",
"collect_default": True},
"streams_log.1-5": {
"path": LOG_FILE + ".1-5",
"collect_default": True},
"streams_stdout.1-5": {
"path": STDOUT_FILE + ".1-5",
"collect_default": True},
"streams_stderr.1-5": {
"path": STDERR_FILE + ".1-5",
"collect_default": True},
"streams_log.1-6": {
"path": LOG_FILE + ".1-6",
"collect_default": True},
"streams_stdout.1-6": {
"path": STDOUT_FILE + ".1-6",
"collect_default": True},
"streams_stderr.1-6": {
"path": STDERR_FILE + ".1-6",
"collect_default": True},
}
def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None, user_test_args3=None):
@ -108,7 +219,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
def clean_node(self, node):
node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
if self.CLEAN_NODE_ENABLED:
node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
def start_cmd(self, node):
args = self.args.copy()
@ -170,7 +282,28 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run")
self.DISABLE_AUTO_TERMINATE = ""
def disable_auto_terminate(self):
self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate"
def start_cmd(self, node):
args = self.args.copy()
args['kafka'] = self.kafka.bootstrap_servers()
args['state_dir'] = self.PERSISTENT_ROOT
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j'] = self.LOG4J_CONFIG_FILE
args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(kafka)s %(state_dir)s %(user_test_args)s %(disable_auto_terminate)s" \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
return cmd
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
@ -211,3 +344,41 @@ class StreamsBrokerCompatibilityService(StreamsTestBaseService):
kafka,
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
eosEnabled)
class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsUpgradeTest",
"")
self.UPGRADE_FROM = ""
def set_version(self, kafka_streams_version):
self.KAFKA_STREAMS_VERSION = kafka_streams_version
def set_upgrade_from(self, upgrade_from):
self.UPGRADE_FROM = upgrade_from
def start_cmd(self, node):
args = self.args.copy()
args['kafka'] = self.kafka.bootstrap_servers()
if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
args['zk'] = self.kafka.zk.connect_setting()
else:
args['zk'] = ""
args['state_dir'] = self.PERSISTENT_ROOT
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j'] = self.LOG4J_CONFIG_FILE
args['version'] = self.KAFKA_STREAMS_VERSION
args['upgrade_from'] = self.UPGRADE_FROM
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
" %(kafka_run_class)s %(streams_class_name)s " \
" %(kafka)s %(zk)s %(state_dir)s %(user_test_args)s %(upgrade_from)s" \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
return cmd

View File

@ -15,21 +15,48 @@
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.mark import parametrize, ignore
from ducktape.mark import ignore, matrix, parametrize
from kafkatest.services.kafka import KafkaService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
from kafkatest.version import LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, KafkaVersion
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsUpgradeTestJobRunnerService
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, DEV_VERSION, KafkaVersion
import random
import time
broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(DEV_BRANCH)]
simple_upgrade_versions_metadata_version_2 = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(DEV_VERSION)]
class StreamsUpgradeTest(Test):
"""
Tests rolling upgrades and downgrades of the Kafka Streams library.
Test upgrading Kafka Streams (all version combination)
If metadata was changes, upgrade is more difficult
Metadata version was bumped in 0.10.1.0
"""
def __init__(self, test_context):
super(StreamsUpgradeTest, self).__init__(test_context)
self.topics = {
'echo' : { 'partitions': 5 },
'data' : { 'partitions': 5 },
}
def perform_broker_upgrade(self, to_version):
self.logger.info("First pass bounce - rolling broker upgrade")
for node in self.kafka.nodes:
self.kafka.stop_node(node)
node.version = KafkaVersion(to_version)
self.kafka.start_node(node)
@cluster(num_nodes=6)
@matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions)
def test_upgrade_downgrade_brokers(self, from_version, to_version):
"""
Start a smoke test client then perform rolling upgrades on the broker.
"""
if from_version == to_version:
return
self.replication = 3
self.partitions = 1
self.isr = 2
@ -55,45 +82,7 @@ class StreamsUpgradeTest(Test):
'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} }
}
def perform_streams_upgrade(self, to_version):
self.logger.info("First pass bounce - rolling streams upgrade")
# get the node running the streams app
node = self.processor1.node
self.processor1.stop()
# change it's version. This will automatically make it pick up a different
# JAR when it starts again
node.version = KafkaVersion(to_version)
self.processor1.start()
def perform_broker_upgrade(self, to_version):
self.logger.info("First pass bounce - rolling broker upgrade")
for node in self.kafka.nodes:
self.kafka.stop_node(node)
node.version = KafkaVersion(to_version)
self.kafka.start_node(node)
@cluster(num_nodes=6)
@parametrize(from_version=str(LATEST_0_10_1), to_version=str(DEV_BRANCH))
@parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
@parametrize(from_version=str(LATEST_0_10_1), to_version=str(LATEST_0_11_0))
@parametrize(from_version=str(LATEST_0_10_2), to_version=str(LATEST_0_11_0))
@parametrize(from_version=str(LATEST_0_11_0), to_version=str(LATEST_0_10_2))
@parametrize(from_version=str(DEV_BRANCH), to_version=str(LATEST_0_10_2))
def test_upgrade_downgrade_streams(self, from_version, to_version):
"""
Start a smoke test client, then abort (kill -9) and restart it a few times.
Ensure that all records are delivered.
Note, that just like tests/core/upgrade_test.py, a prerequisite for this test to succeed
if the inclusion of all parametrized versions of kafka in kafka/vagrant/base.sh
(search for get_kafka()). For streams in particular, that means that someone has manually
copies the kafka-stream-$version-test.jar in the right S3 bucket as shown in base.sh.
"""
# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
@ -108,47 +97,6 @@ class StreamsUpgradeTest(Test):
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
self.driver.start()
self.processor1.start()
time.sleep(15)
self.perform_streams_upgrade(to_version)
time.sleep(15)
self.driver.wait()
self.driver.stop()
self.processor1.stop()
node = self.driver.node
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
@cluster(num_nodes=6)
@parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
def test_upgrade_brokers(self, from_version, to_version):
"""
Start a smoke test client then perform rolling upgrades on the broker.
"""
# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
# number of nodes needs to be >= 3 for the smoke test
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
self.kafka.start()
# allow some time for topics to be created
time.sleep(10)
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
self.driver.start()
self.processor1.start()
@ -165,3 +113,240 @@ class StreamsUpgradeTest(Test):
node = self.driver.node
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
@matrix(from_version=simple_upgrade_versions_metadata_version_2, to_version=simple_upgrade_versions_metadata_version_2)
def test_simple_upgrade_downgrade(self, from_version, to_version):
"""
Starts 3 KafkaStreams instances with <old_version>, and upgrades one-by-one to <new_version>
"""
if from_version == to_version:
return
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
self.kafka.start()
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.driver.disable_auto_terminate()
self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.driver.start()
self.start_all_nodes_with(from_version)
self.processors = [self.processor1, self.processor2, self.processor3]
counter = 1
random.seed()
# upgrade one-by-one via rolling bounce
random.shuffle(self.processors)
for p in self.processors:
p.CLEAN_NODE_ENABLED = False
self.do_rolling_bounce(p, "", to_version, counter)
counter = counter + 1
# shutdown
self.driver.stop()
self.driver.wait()
random.shuffle(self.processors)
for p in self.processors:
node = p.node
with node.account.monitor_log(p.STDOUT_FILE) as monitor:
p.stop()
monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
self.driver.stop()
#@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test until Kafka 0.10.1.2 is released
#@parametrize(new_version=str(LATEST_0_10_2)) we cannot run this test until Kafka 0.10.2.2 is released
#@parametrize(new_version=str(LATEST_0_11_0)) we cannot run this test until Kafka 0.11.0.3 is released
@parametrize(new_version=str(DEV_VERSION))
def test_metadata_upgrade(self, new_version):
"""
Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to <new_version>
"""
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
self.kafka.start()
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.driver.disable_auto_terminate()
self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.driver.start()
self.start_all_nodes_with(str(LATEST_0_10_0))
self.processors = [self.processor1, self.processor2, self.processor3]
counter = 1
random.seed()
# first rolling bounce
random.shuffle(self.processors)
for p in self.processors:
p.CLEAN_NODE_ENABLED = False
self.do_rolling_bounce(p, "0.10.0", new_version, counter)
counter = counter + 1
# second rolling bounce
random.shuffle(self.processors)
for p in self.processors:
self.do_rolling_bounce(p, "", new_version, counter)
counter = counter + 1
# shutdown
self.driver.stop()
self.driver.wait()
random.shuffle(self.processors)
for p in self.processors:
node = p.node
with node.account.monitor_log(p.STDOUT_FILE) as monitor:
p.stop()
monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
self.driver.stop()
def start_all_nodes_with(self, version):
# start first with <version>
self.prepare_for(self.processor1, version)
node1 = self.processor1.node
with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
self.processor1.start()
log_monitor.wait_until("Kafka version : " + version,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
monitor.wait_until("processed 100 records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
# start second with <version>
self.prepare_for(self.processor2, version)
node2 = self.processor2.node
with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
self.processor2.start()
log_monitor.wait_until("Kafka version : " + version,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account))
first_monitor.wait_until("processed 100 records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
second_monitor.wait_until("processed 100 records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
# start third with <version>
self.prepare_for(self.processor3, version)
node3 = self.processor3.node
with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
self.processor3.start()
log_monitor.wait_until("Kafka version : " + version,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account))
first_monitor.wait_until("processed 100 records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
second_monitor.wait_until("processed 100 records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
third_monitor.wait_until("processed 100 records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))
@staticmethod
def prepare_for(processor, version):
processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)
if version == str(DEV_VERSION):
processor.set_version("") # set to TRUNK
else:
processor.set_version(version)
def do_rolling_bounce(self, processor, upgrade_from, new_version, counter):
first_other_processor = None
second_other_processor = None
for p in self.processors:
if p != processor:
if first_other_processor is None:
first_other_processor = p
else:
second_other_processor = p
node = processor.node
first_other_node = first_other_processor.node
second_other_node = second_other_processor.node
# stop processor and wait for rebalance of others
with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
processor.stop()
first_other_monitor.wait_until("processed 100 records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
second_other_monitor.wait_until("processed 100 records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
if upgrade_from == "": # upgrade disabled -- second round of rolling bounces
roll_counter = ".1-" # second round of rolling bounces
else:
roll_counter = ".0-" # first round of rolling boundes
node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False)
node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)
node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + roll_counter + str(counter), allow_fail=False)
if new_version == str(DEV_VERSION):
processor.set_version("") # set to TRUNK
else:
processor.set_version(new_version)
processor.set_upgrade_from(upgrade_from)
grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" "
with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
processor.start()
log_monitor.wait_until("Kafka version : " + new_version,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + new_version + " " + str(node.account))
first_other_monitor.wait_until("processed 100 records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
second_other_monitor.wait_until("processed 100 records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
monitor.wait_until("processed 100 records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account))

View File

@ -61,6 +61,7 @@ def get_version(node=None):
return DEV_BRANCH
DEV_BRANCH = KafkaVersion("dev")
DEV_VERSION = KafkaVersion("1.0.2-SNAPSHOT")
# 0.8.2.X versions
V_0_8_2_1 = KafkaVersion("0.8.2.1")
@ -91,7 +92,9 @@ LATEST_0_10 = LATEST_0_10_2
# 0.11.0.0 versions
V_0_11_0_0 = KafkaVersion("0.11.0.0")
LATEST_0_11_0 = V_0_11_0_0
V_0_11_0_1 = KafkaVersion("0.11.0.1")
V_0_11_0_2 = KafkaVersion("0.11.0.2")
LATEST_0_11_0 = V_0_11_0_2
LATEST_0_11 = LATEST_0_11_0
# 1.0.0 versions

View File

@ -93,8 +93,8 @@ get_kafka 0.10.1.1 2.11
chmod a+rw /opt/kafka-0.10.1.1
get_kafka 0.10.2.1 2.11
chmod a+rw /opt/kafka-0.10.2.1
get_kafka 0.11.0.0 2.11
chmod a+rw /opt/kafka-0.11.0.0
get_kafka 0.11.0.2 2.11
chmod a+rw /opt/kafka-0.11.0.2
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local