mirror of https://github.com/apache/kafka.git
MINOR: Enable ignored upgrade system tests - trunk (#5605)
Removed ignore annotations from the upgrade tests. This PR includes the following changes for updating the upgrade tests: * Uploaded new versions 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, and 2.0.0 (in the associated scala versions) to kafka-packages * Update versions in version.py, Dockerfile, base.sh * Added new versions to StreamsUpgradeTest.test_upgrade_downgrade_brokers including version 2.0.0 * Added new versions StreamsUpgradeTest.test_simple_upgrade_downgrade test excluding version 2.0.0 * Version 2.0.0 is excluded from the streams upgrade/downgrade test as StreamsConfig needs an update for the new version, requiring a KIP. Once the community votes the KIP in, a minor follow-up PR can be pushed to add the 2.0.0 version to the upgrade test. * Fixed minor bug in kafka-run-class.sh for classpath in upgrade/downgrade tests across versions. * Follow on PRs for 0.10.2x, 0.11.0x, 1.0.x, 1.1.x, and 2.0.x will be pushed soon with the same updates required for the specific version. Reviewers: Eno Thereska <eno.thereska@gmail.com>, John Roesler <vvcephei@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
466d89306e
commit
b74e7e407c
|
@ -107,7 +107,7 @@ else
|
||||||
for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
|
for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
|
||||||
do
|
do
|
||||||
if should_include_file "$file"; then
|
if should_include_file "$file"; then
|
||||||
CLASSPATH="$CLASSPATH":"$file"
|
CLASSPATH="$file":"$CLASSPATH"
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
fi
|
fi
|
||||||
|
|
12
build.gradle
12
build.gradle
|
@ -1189,6 +1189,18 @@ project(':streams:upgrade-system-tests-11') {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
project(':streams:upgrade-system-tests-20') {
|
||||||
|
archivesBaseName = "kafka-streams-upgrade-system-tests-20"
|
||||||
|
|
||||||
|
dependencies {
|
||||||
|
testCompile libs.kafkaStreams_20
|
||||||
|
}
|
||||||
|
|
||||||
|
systemTestLibs {
|
||||||
|
dependsOn testJar
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
project(':jmh-benchmarks') {
|
project(':jmh-benchmarks') {
|
||||||
|
|
||||||
apply plugin: 'com.github.johnrengelman.shadow'
|
apply plugin: 'com.github.johnrengelman.shadow'
|
||||||
|
|
|
@ -66,10 +66,11 @@ versions += [
|
||||||
junit: "4.12",
|
junit: "4.12",
|
||||||
kafka_0100: "0.10.0.1",
|
kafka_0100: "0.10.0.1",
|
||||||
kafka_0101: "0.10.1.1",
|
kafka_0101: "0.10.1.1",
|
||||||
kafka_0102: "0.10.2.1",
|
kafka_0102: "0.10.2.2",
|
||||||
kafka_0110: "0.11.0.2",
|
kafka_0110: "0.11.0.3",
|
||||||
kafka_10: "1.0.1",
|
kafka_10: "1.0.2",
|
||||||
kafka_11: "1.1.0",
|
kafka_11: "1.1.1",
|
||||||
|
kafka_20: "2.0.0",
|
||||||
lz4: "1.4.1",
|
lz4: "1.4.1",
|
||||||
mavenArtifact: "3.5.3",
|
mavenArtifact: "3.5.3",
|
||||||
metrics: "2.2.0",
|
metrics: "2.2.0",
|
||||||
|
@ -120,6 +121,7 @@ libs += [
|
||||||
kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110",
|
kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110",
|
||||||
kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
|
kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
|
||||||
kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
|
kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
|
||||||
|
kafkaStreams_20: "org.apache.kafka:kafka-streams:$versions.kafka_20",
|
||||||
log4j: "log4j:log4j:$versions.log4j",
|
log4j: "log4j:log4j:$versions.log4j",
|
||||||
lz4: "org.lz4:lz4-java:$versions.lz4",
|
lz4: "org.lz4:lz4-java:$versions.lz4",
|
||||||
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
|
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
|
||||||
|
|
|
@ -15,6 +15,6 @@
|
||||||
|
|
||||||
include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples',
|
include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples',
|
||||||
'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
|
'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
|
||||||
'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11',
|
'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11', 'streams:upgrade-system-tests-20',
|
||||||
'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file',
|
'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file',
|
||||||
'connect:basic-auth-extension', 'jmh-benchmarks'
|
'connect:basic-auth-extension', 'jmh-benchmarks'
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.streams.tests;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
import org.apache.kafka.streams.KafkaStreams;
|
||||||
|
import org.apache.kafka.streams.StreamsBuilder;
|
||||||
|
import org.apache.kafka.streams.StreamsConfig;
|
||||||
|
import org.apache.kafka.streams.kstream.KStream;
|
||||||
|
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||||
|
import org.apache.kafka.streams.processor.Processor;
|
||||||
|
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||||
|
import org.apache.kafka.streams.processor.ProcessorSupplier;
|
||||||
|
|
||||||
|
public class StreamsUpgradeTest {
|
||||||
|
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static void main(final String[] args) throws Exception {
|
||||||
|
if (args.length < 2) {
|
||||||
|
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
|
||||||
|
+ (args.length > 0 ? args[0] : ""));
|
||||||
|
}
|
||||||
|
final String kafka = args[0];
|
||||||
|
final String propFileName = args.length > 1 ? args[1] : null;
|
||||||
|
|
||||||
|
final Properties streamsProperties = Utils.loadProps(propFileName);
|
||||||
|
|
||||||
|
System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.0)");
|
||||||
|
System.out.println("kafka=" + kafka);
|
||||||
|
System.out.println("props=" + streamsProperties);
|
||||||
|
|
||||||
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
|
final KStream dataStream = builder.stream("data");
|
||||||
|
dataStream.process(printProcessorSupplier());
|
||||||
|
dataStream.to("echo");
|
||||||
|
|
||||||
|
final Properties config = new Properties();
|
||||||
|
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
|
||||||
|
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
|
||||||
|
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
|
||||||
|
config.putAll(streamsProperties);
|
||||||
|
|
||||||
|
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
|
||||||
|
streams.start();
|
||||||
|
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
streams.close();
|
||||||
|
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
|
||||||
|
System.out.flush();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
|
||||||
|
return new ProcessorSupplier<K, V>() {
|
||||||
|
public Processor<K, V> get() {
|
||||||
|
return new AbstractProcessor<K, V>() {
|
||||||
|
private int numRecordsProcessed = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(final ProcessorContext context) {
|
||||||
|
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
|
||||||
|
numRecordsProcessed = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(final K key, final V value) {
|
||||||
|
numRecordsProcessed++;
|
||||||
|
if (numRecordsProcessed % 100 == 0) {
|
||||||
|
System.out.println("processed " + numRecordsProcessed + " records from topic=data");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
|
@ -43,32 +43,22 @@ RUN ssh-keygen -q -t rsa -N '' -f /root/.ssh/id_rsa && cp -f /root/.ssh/id_rsa.p
|
||||||
ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
|
ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
|
||||||
RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
|
RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
|
||||||
RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
|
RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
|
||||||
RUN mkdir -p "/opt/kafka-0.10.0.0" && chmod a+rw /opt/kafka-0.10.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.0"
|
|
||||||
RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
|
RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
|
||||||
RUN mkdir -p "/opt/kafka-0.10.1.0" && chmod a+rw /opt/kafka-0.10.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.0"
|
|
||||||
RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
|
RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
|
||||||
RUN mkdir -p "/opt/kafka-0.10.2.0" && chmod a+rw /opt/kafka-0.10.2.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.0"
|
RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.2"
|
||||||
RUN mkdir -p "/opt/kafka-0.10.2.1" && chmod a+rw /opt/kafka-0.10.2.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
|
RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3"
|
||||||
RUN mkdir -p "/opt/kafka-0.11.0.0" && chmod a+rw /opt/kafka-0.11.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
|
RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2"
|
||||||
RUN mkdir -p "/opt/kafka-0.11.0.1" && chmod a+rw /opt/kafka-0.11.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.1"
|
RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1"
|
||||||
RUN mkdir -p "/opt/kafka-0.11.0.2" && chmod a+rw /opt/kafka-0.11.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.2"
|
RUN mkdir -p "/opt/kafka-2.0.0" && chmod a+rw /opt/kafka-2.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.0"
|
||||||
RUN mkdir -p "/opt/kafka-1.0.0" && chmod a+rw /opt/kafka-1.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.0"
|
|
||||||
RUN mkdir -p "/opt/kafka-1.0.1" && chmod a+rw /opt/kafka-1.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.1"
|
|
||||||
RUN mkdir -p "/opt/kafka-1.1.0" && chmod a+rw /opt/kafka-1.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.0"
|
|
||||||
|
|
||||||
# Streams test dependencies
|
# Streams test dependencies
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.0-test.jar" -o /opt/kafka-0.10.0.0/libs/kafka-streams-0.10.0.0-test.jar
|
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
|
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.0-test.jar" -o /opt/kafka-0.10.1.0/libs/kafka-streams-0.10.1.0-test.jar
|
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar
|
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.0-test.jar" -o /opt/kafka-0.10.2.0/libs/kafka-streams-0.10.2.0-test.jar
|
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o /opt/kafka-0.10.2.2/libs/kafka-streams-0.10.2.2-test.jar
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.1-test.jar" -o /opt/kafka-0.10.2.1/libs/kafka-streams-0.10.2.1-test.jar
|
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.0-test.jar" -o /opt/kafka-0.11.0.0/libs/kafka-streams-0.11.0.0-test.jar
|
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.1-test.jar" -o /opt/kafka-0.11.0.1/libs/kafka-streams-0.11.0.1-test.jar
|
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.2-test.jar" -o /opt/kafka-0.11.0.2/libs/kafka-streams-0.11.0.2-test.jar
|
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.0-test.jar" -o /opt/kafka-2.0.0/libs/kafka-streams-2.0.0-test.jar
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.0-test.jar" -o /opt/kafka-1.0.0/libs/kafka-streams-1.0.0-test.jar
|
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.1-test.jar" -o /opt/kafka-1.0.1/libs/kafka-streams-1.0.1-test.jar
|
|
||||||
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.0-test.jar" -o /opt/kafka-1.1.0/libs/kafka-streams-1.1.0-test.jar
|
|
||||||
|
|
||||||
# The version of Kibosh to use for testing.
|
# The version of Kibosh to use for testing.
|
||||||
# If you update this, also update vagrant/base.sy
|
# If you update this, also update vagrant/base.sy
|
||||||
|
|
|
@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until
|
||||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||||
from kafkatest.services.kafka import KafkaConfig
|
from kafkatest.services.kafka import KafkaConfig
|
||||||
from kafkatest.services.monitor.jmx import JmxMixin
|
from kafkatest.services.monitor.jmx import JmxMixin
|
||||||
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1
|
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0
|
||||||
|
|
||||||
STATE_DIR = "state.dir"
|
STATE_DIR = "state.dir"
|
||||||
|
|
||||||
|
@ -455,7 +455,8 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
|
||||||
|
|
||||||
def start_cmd(self, node):
|
def start_cmd(self, node):
|
||||||
args = self.args.copy()
|
args = self.args.copy()
|
||||||
if self.KAFKA_STREAMS_VERSION in [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]:
|
if self.KAFKA_STREAMS_VERSION in [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2),
|
||||||
|
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0)]:
|
||||||
args['kafka'] = self.kafka.bootstrap_servers()
|
args['kafka'] = self.kafka.bootstrap_servers()
|
||||||
else:
|
else:
|
||||||
args['kafka'] = ""
|
args['kafka'] = ""
|
||||||
|
|
|
@ -15,24 +15,24 @@
|
||||||
|
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
from ducktape.mark import ignore, matrix
|
from ducktape.mark import ignore
|
||||||
|
from ducktape.mark import matrix
|
||||||
from ducktape.mark.resource import cluster
|
from ducktape.mark.resource import cluster
|
||||||
from ducktape.tests.test import Test
|
from ducktape.tests.test import Test
|
||||||
from kafkatest.services.kafka import KafkaService
|
from kafkatest.services.kafka import KafkaService
|
||||||
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsUpgradeTestJobRunnerService
|
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsUpgradeTestJobRunnerService
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, DEV_BRANCH, DEV_VERSION, KafkaVersion
|
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, DEV_BRANCH, DEV_VERSION, KafkaVersion
|
||||||
|
|
||||||
# broker 0.10.0 is not compatible with newer Kafka Streams versions
|
# broker 0.10.0 is not compatible with newer Kafka Streams versions
|
||||||
broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(DEV_BRANCH)]
|
broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(DEV_BRANCH)]
|
||||||
|
|
||||||
metadata_1_versions = [str(LATEST_0_10_0)]
|
metadata_1_versions = [str(LATEST_0_10_0)]
|
||||||
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
|
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
|
||||||
# we can add the following versions to `backward_compatible_metadata_2_versions` after the corresponding
|
# once 0.10.1.2 is available backward_compatible_metadata_2_versions
|
||||||
# bug-fix release 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, and 1.1.1 are available:
|
# can be replaced with metadata_2_versions
|
||||||
# str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)
|
backward_compatible_metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
|
||||||
backward_compatible_metadata_2_versions = []
|
metadata_3_or_higher_versions = [str(LATEST_2_0), str(DEV_VERSION)]
|
||||||
metadata_3_versions = [str(DEV_VERSION)]
|
|
||||||
|
|
||||||
class StreamsUpgradeTest(Test):
|
class StreamsUpgradeTest(Test):
|
||||||
"""
|
"""
|
||||||
|
@ -125,7 +125,6 @@ class StreamsUpgradeTest(Test):
|
||||||
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
|
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
|
||||||
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
|
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
|
||||||
|
|
||||||
@ignore
|
|
||||||
@matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
|
@matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
|
||||||
def test_simple_upgrade_downgrade(self, from_version, to_version):
|
def test_simple_upgrade_downgrade(self, from_version, to_version):
|
||||||
"""
|
"""
|
||||||
|
@ -178,8 +177,8 @@ class StreamsUpgradeTest(Test):
|
||||||
self.driver.stop()
|
self.driver.stop()
|
||||||
|
|
||||||
@matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions)
|
@matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions)
|
||||||
@matrix(from_version=metadata_1_versions, to_version=metadata_3_versions)
|
@matrix(from_version=metadata_1_versions, to_version=metadata_3_or_higher_versions)
|
||||||
@matrix(from_version=metadata_2_versions, to_version=metadata_3_versions)
|
@matrix(from_version=metadata_2_versions, to_version=metadata_3_or_higher_versions)
|
||||||
def test_metadata_upgrade(self, from_version, to_version):
|
def test_metadata_upgrade(self, from_version, to_version):
|
||||||
"""
|
"""
|
||||||
Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
|
Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
|
||||||
|
|
|
@ -14,9 +14,8 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
from kafkatest.utils import kafkatest_version
|
|
||||||
|
|
||||||
from distutils.version import LooseVersion
|
from distutils.version import LooseVersion
|
||||||
|
from kafkatest.utils import kafkatest_version
|
||||||
|
|
||||||
|
|
||||||
class KafkaVersion(LooseVersion):
|
class KafkaVersion(LooseVersion):
|
||||||
|
@ -86,7 +85,8 @@ LATEST_0_10_1 = V_0_10_1_1
|
||||||
# 0.10.2.x versions
|
# 0.10.2.x versions
|
||||||
V_0_10_2_0 = KafkaVersion("0.10.2.0")
|
V_0_10_2_0 = KafkaVersion("0.10.2.0")
|
||||||
V_0_10_2_1 = KafkaVersion("0.10.2.1")
|
V_0_10_2_1 = KafkaVersion("0.10.2.1")
|
||||||
LATEST_0_10_2 = V_0_10_2_1
|
V_0_10_2_2 = KafkaVersion("0.10.2.2")
|
||||||
|
LATEST_0_10_2 = V_0_10_2_2
|
||||||
|
|
||||||
LATEST_0_10 = LATEST_0_10_2
|
LATEST_0_10 = LATEST_0_10_2
|
||||||
|
|
||||||
|
@ -94,17 +94,20 @@ LATEST_0_10 = LATEST_0_10_2
|
||||||
V_0_11_0_0 = KafkaVersion("0.11.0.0")
|
V_0_11_0_0 = KafkaVersion("0.11.0.0")
|
||||||
V_0_11_0_1 = KafkaVersion("0.11.0.1")
|
V_0_11_0_1 = KafkaVersion("0.11.0.1")
|
||||||
V_0_11_0_2 = KafkaVersion("0.11.0.2")
|
V_0_11_0_2 = KafkaVersion("0.11.0.2")
|
||||||
LATEST_0_11_0 = V_0_11_0_2
|
V_0_11_0_3 = KafkaVersion("0.11.0.3")
|
||||||
|
LATEST_0_11_0 = V_0_11_0_3
|
||||||
LATEST_0_11 = LATEST_0_11_0
|
LATEST_0_11 = LATEST_0_11_0
|
||||||
|
|
||||||
# 1.0.x versions
|
# 1.0.x versions
|
||||||
V_1_0_0 = KafkaVersion("1.0.0")
|
V_1_0_0 = KafkaVersion("1.0.0")
|
||||||
V_1_0_1 = KafkaVersion("1.0.1")
|
V_1_0_1 = KafkaVersion("1.0.1")
|
||||||
LATEST_1_0 = V_1_0_1
|
V_1_0_2 = KafkaVersion("1.0.2")
|
||||||
|
LATEST_1_0 = V_1_0_2
|
||||||
|
|
||||||
# 1.1.x versions
|
# 1.1.x versions
|
||||||
V_1_1_0 = KafkaVersion("1.1.0")
|
V_1_1_0 = KafkaVersion("1.1.0")
|
||||||
LATEST_1_1 = V_1_1_0
|
V_1_1_1 = KafkaVersion("1.1.1")
|
||||||
|
LATEST_1_1 = V_1_1_1
|
||||||
|
|
||||||
# 2.0.x versions
|
# 2.0.x versions
|
||||||
V_2_0_0 = KafkaVersion("2.0.0")
|
V_2_0_0 = KafkaVersion("2.0.0")
|
||||||
|
|
|
@ -112,14 +112,16 @@ get_kafka 0.10.0.1 2.11
|
||||||
chmod a+rw /opt/kafka-0.10.0.1
|
chmod a+rw /opt/kafka-0.10.0.1
|
||||||
get_kafka 0.10.1.1 2.11
|
get_kafka 0.10.1.1 2.11
|
||||||
chmod a+rw /opt/kafka-0.10.1.1
|
chmod a+rw /opt/kafka-0.10.1.1
|
||||||
get_kafka 0.10.2.1 2.11
|
get_kafka 0.10.2.2 2.11
|
||||||
chmod a+rw /opt/kafka-0.10.2.1
|
chmod a+rw /opt/kafka-0.10.2.2
|
||||||
get_kafka 0.11.0.2 2.11
|
get_kafka 0.11.0.3 2.11
|
||||||
chmod a+rw /opt/kafka-0.11.0.2
|
chmod a+rw /opt/kafka-0.11.0.3
|
||||||
get_kafka 1.0.1 2.11
|
get_kafka 1.0.2 2.11
|
||||||
chmod a+rw /opt/kafka-1.0.1
|
chmod a+rw /opt/kafka-1.0.2
|
||||||
get_kafka 1.1.0 2.11
|
get_kafka 1.1.1 2.11
|
||||||
chmod a+rw /opt/kafka-1.1.0
|
chmod a+rw /opt/kafka-1.1.1
|
||||||
|
get_kafka 2.0.0 2.12
|
||||||
|
chmod a+rw /opt/kafka-2.0.0
|
||||||
|
|
||||||
|
|
||||||
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
|
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
|
||||||
|
|
Loading…
Reference in New Issue