MINOR: Remove sleep calls and ignore annotation from streams upgrade test (#6046)

The StreamsUpgradeTest::test_upgrade_downgrade_brokers used sleep calls in the test which led to flaky test performance and as a result, we placed an @ignore annotation on the test. This PR uses log events instead of the sleep calls hence we can now remove the @ignore setting.

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Bill Bejeck 2019-01-07 02:03:54 -05:00 committed by Guozhang Wang
parent 1f3cb6c4ef
commit 404bdef08d
8 changed files with 186 additions and 44 deletions

View File

@ -1207,6 +1207,18 @@ project(':streams:upgrade-system-tests-20') {
} }
} }
project(':streams:upgrade-system-tests-21') {
archivesBaseName = "kafka-streams-upgrade-system-tests-21"
dependencies {
testCompile libs.kafkaStreams_21
}
systemTestLibs {
dependsOn testJar
}
}
project(':jmh-benchmarks') { project(':jmh-benchmarks') {
apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'com.github.johnrengelman.shadow'

View File

@ -71,7 +71,8 @@ versions += [
kafka_0110: "0.11.0.3", kafka_0110: "0.11.0.3",
kafka_10: "1.0.2", kafka_10: "1.0.2",
kafka_11: "1.1.1", kafka_11: "1.1.1",
kafka_20: "2.0.0", kafka_20: "2.0.1",
kafka_21: "2.1.0",
lz4: "1.5.0", lz4: "1.5.0",
mavenArtifact: "3.6.0", mavenArtifact: "3.6.0",
metrics: "2.2.0", metrics: "2.2.0",
@ -126,6 +127,7 @@ libs += [
kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10", kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11", kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
kafkaStreams_20: "org.apache.kafka:kafka-streams:$versions.kafka_20", kafkaStreams_20: "org.apache.kafka:kafka-streams:$versions.kafka_20",
kafkaStreams_21: "org.apache.kafka:kafka-streams:$versions.kafka_21",
log4j: "log4j:log4j:$versions.log4j", log4j: "log4j:log4j:$versions.log4j",
lz4: "org.lz4:lz4-java:$versions.lz4", lz4: "org.lz4:lz4-java:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics", metrics: "com.yammer.metrics:metrics-core:$versions.metrics",

View File

@ -16,5 +16,5 @@
include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples', include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples',
'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', 'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11', 'streams:upgrade-system-tests-20', 'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11', 'streams:upgrade-system-tests-20',
'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'streams:upgrade-system-tests-21' , 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file',
'connect:basic-auth-extension', 'jmh-benchmarks' 'connect:basic-auth-extension', 'jmh-benchmarks'

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import java.util.Properties;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.1)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data");
dataStream.process(printProcessorSupplier());
dataStream.to("echo");
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}
});
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
return new ProcessorSupplier<K, V>() {
public Processor<K, V> get() {
return new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@Override
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data");
}
}
@Override
public void close() {}
};
}
};
}
}

View File

@ -49,7 +49,8 @@ RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2 && curl -s
RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3" RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.3"
RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2" RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.2"
RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1" RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.1.1"
RUN mkdir -p "/opt/kafka-2.0.0" && chmod a+rw /opt/kafka-2.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.0" RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.0.1"
RUN mkdir -p "/opt/kafka-2.1.0" && chmod a+rw /opt/kafka-2.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.1.0"
# Streams test dependencies # Streams test dependencies
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
@ -58,7 +59,8 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o /opt/kafka-0.10.2
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o /opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.0-test.jar" -o /opt/kafka-2.0.0/libs/kafka-streams-2.0.0-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o /opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.0-test.jar" -o /opt/kafka-2.1.0/libs/kafka-streams-2.1.0-test.jar
# The version of Kibosh to use for testing. # The version of Kibosh to use for testing.
# If you update this, also update vagrant/base.sy # If you update this, also update vagrant/base.sy

View File

@ -15,17 +15,19 @@
import random import random
import time import time
from ducktape.mark import ignore
from ducktape.mark import matrix from ducktape.mark import matrix
from ducktape.mark.resource import cluster from ducktape.mark.resource import cluster
from ducktape.tests.test import Test from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsUpgradeTestJobRunnerService from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \
StreamsUpgradeTestJobRunnerService
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, DEV_BRANCH, DEV_VERSION, KafkaVersion from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, DEV_BRANCH, DEV_VERSION, KafkaVersion
# broker 0.10.0 is not compatible with newer Kafka Streams versions # broker 0.10.0 is not compatible with newer Kafka Streams versions
broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(DEV_BRANCH)] broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(DEV_BRANCH)]
metadata_1_versions = [str(LATEST_0_10_0)] metadata_1_versions = [str(LATEST_0_10_0)]
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
@ -38,7 +40,8 @@ class StreamsUpgradeTest(Test):
""" """
Test upgrading Kafka Streams (all version combination) Test upgrading Kafka Streams (all version combination)
If metadata was changes, upgrade is more difficult If metadata was changes, upgrade is more difficult
Metadata version was bumped in 0.10.1.0 Metadata version was bumped in 0.10.1.0 and
subsequently bumped in 2.0.0
""" """
def __init__(self, test_context): def __init__(self, test_context):
@ -50,6 +53,8 @@ class StreamsUpgradeTest(Test):
self.leader = None self.leader = None
self.leader_counter = {} self.leader_counter = {}
processed_msg = "processed [0-9]* records"
def perform_broker_upgrade(self, to_version): def perform_broker_upgrade(self, to_version):
self.logger.info("First pass bounce - rolling broker upgrade") self.logger.info("First pass bounce - rolling broker upgrade")
for node in self.kafka.nodes: for node in self.kafka.nodes:
@ -57,7 +62,6 @@ class StreamsUpgradeTest(Test):
node.version = KafkaVersion(to_version) node.version = KafkaVersion(to_version)
self.kafka.start_node(node) self.kafka.start_node(node)
@ignore
@cluster(num_nodes=6) @cluster(num_nodes=6)
@matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions) @matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions)
def test_upgrade_downgrade_brokers(self, from_version, to_version): def test_upgrade_downgrade_brokers(self, from_version, to_version):
@ -69,6 +73,7 @@ class StreamsUpgradeTest(Test):
return return
self.replication = 3 self.replication = 3
self.num_kafka_nodes = 3
self.partitions = 1 self.partitions = 1
self.isr = 2 self.isr = 2
self.topics = { self.topics = {
@ -99,31 +104,48 @@ class StreamsUpgradeTest(Test):
self.zk.start() self.zk.start()
# number of nodes needs to be >= 3 for the smoke test # number of nodes needs to be >= 3 for the smoke test
self.kafka = KafkaService(self.test_context, num_nodes=3, self.kafka = KafkaService(self.test_context, num_nodes=self.num_kafka_nodes,
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics) zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
self.kafka.start() self.kafka.start()
# allow some time for topics to be created # allow some time for topics to be created
time.sleep(10) wait_until(lambda: self.get_topics_count() >= (len(self.topics) * self.num_kafka_nodes),
timeout_sec=60,
err_msg="Broker did not create all topics in 60 seconds ")
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
self.driver.start()
self.processor1.start()
time.sleep(15)
self.perform_broker_upgrade(to_version) processor = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as driver_monitor:
self.driver.start()
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
processor.start()
monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node))
connected_message = "Discovered group coordinator"
with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
self.perform_broker_upgrade(to_version)
log_monitor.wait_until(connected_message,
timeout_sec=120,
err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account))
stdout_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node.account))
driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED',
timeout_sec=180,
err_msg="Never saw output '%s' on" % 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' + str(self.driver.node.account))
time.sleep(15)
self.driver.wait()
self.driver.stop() self.driver.stop()
processor.stop()
self.processor1.stop() processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
node = self.driver.node
node.account.ssh("grep -E 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
@matrix(from_version=metadata_2_versions, to_version=metadata_2_versions) @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
def test_simple_upgrade_downgrade(self, from_version, to_version): def test_simple_upgrade_downgrade(self, from_version, to_version):
@ -163,7 +185,6 @@ class StreamsUpgradeTest(Test):
# shutdown # shutdown
self.driver.stop() self.driver.stop()
self.driver.wait()
random.shuffle(self.processors) random.shuffle(self.processors)
for p in self.processors: for p in self.processors:
@ -174,8 +195,6 @@ class StreamsUpgradeTest(Test):
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
self.driver.stop()
@matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions) @matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions)
@matrix(from_version=metadata_1_versions, to_version=metadata_3_or_higher_versions) @matrix(from_version=metadata_1_versions, to_version=metadata_3_or_higher_versions)
@matrix(from_version=metadata_2_versions, to_version=metadata_3_or_higher_versions) @matrix(from_version=metadata_2_versions, to_version=metadata_3_or_higher_versions)
@ -219,7 +238,6 @@ class StreamsUpgradeTest(Test):
# shutdown # shutdown
self.driver.stop() self.driver.stop()
self.driver.wait()
random.shuffle(self.processors) random.shuffle(self.processors)
for p in self.processors: for p in self.processors:
@ -230,8 +248,6 @@ class StreamsUpgradeTest(Test):
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
self.driver.stop()
def test_version_probing_upgrade(self): def test_version_probing_upgrade(self):
""" """
Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version" Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version"
@ -276,7 +292,6 @@ class StreamsUpgradeTest(Test):
# shutdown # shutdown
self.driver.stop() self.driver.stop()
self.driver.wait()
random.shuffle(self.processors) random.shuffle(self.processors)
for p in self.processors: for p in self.processors:
@ -287,8 +302,6 @@ class StreamsUpgradeTest(Test):
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
self.driver.stop()
def update_leader(self): def update_leader(self):
self.leader = None self.leader = None
retries = 10 retries = 10
@ -329,7 +342,7 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str, log_monitor.wait_until(kafka_version_str,
timeout_sec=60, timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account)) err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
monitor.wait_until("processed 100 records from topic", monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
@ -343,10 +356,10 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str, log_monitor.wait_until(kafka_version_str,
timeout_sec=60, timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account)) err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account))
first_monitor.wait_until("processed 100 records from topic", first_monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
second_monitor.wait_until("processed 100 records from topic", second_monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account)) err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
@ -361,13 +374,13 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str, log_monitor.wait_until(kafka_version_str,
timeout_sec=60, timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account)) err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account))
first_monitor.wait_until("processed 100 records from topic", first_monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
second_monitor.wait_until("processed 100 records from topic", second_monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account)) err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
third_monitor.wait_until("processed 100 records from topic", third_monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account)) err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))
@ -582,3 +595,11 @@ class StreamsUpgradeTest(Test):
found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True)) found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))
if len(found) > 0: if len(found) > 0:
raise Exception("Kafka Streams failed with 'group member upgraded to metadata 4 too early'") raise Exception("Kafka Streams failed with 'group member upgraded to metadata 4 too early'")
def get_topics_count(self):
count = 0
for node in self.kafka.nodes:
topic_list = self.kafka.list_topics("placeholder", node)
for topic in topic_list:
count += 1
return count

View File

@ -111,4 +111,9 @@ LATEST_1_1 = V_1_1_1
# 2.0.x versions # 2.0.x versions
V_2_0_0 = KafkaVersion("2.0.0") V_2_0_0 = KafkaVersion("2.0.0")
LATEST_2_0 = V_2_0_0 V_2_0_1 = KafkaVersion("2.0.1")
LATEST_2_0 = V_2_0_1
# 2.1.x versions
V_2_1_0 = KafkaVersion("2.1.0")
LATEST_2_1 = V_2_1_0

View File

@ -120,8 +120,10 @@ get_kafka 1.0.2 2.11
chmod a+rw /opt/kafka-1.0.2 chmod a+rw /opt/kafka-1.0.2
get_kafka 1.1.1 2.11 get_kafka 1.1.1 2.11
chmod a+rw /opt/kafka-1.1.1 chmod a+rw /opt/kafka-1.1.1
get_kafka 2.0.0 2.12 get_kafka 2.0.1 2.12
chmod a+rw /opt/kafka-2.0.0 chmod a+rw /opt/kafka-2.0.1
get_kafka 2.1.0 2.12
chmod a+rw /opt/kafka-2.1.0
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local # For EC2 nodes, we want to use /mnt, which should have the local disk. On local