MINOR: Tighten up metadata upgrade test (#6531)

Reviewers: Bill Bejeck <bbejeck@gmail.com>
This commit is contained in:
Guozhang Wang 2019-04-05 12:50:42 -07:00 committed by GitHub
parent 844120c601
commit 4aa2cfe467
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 40 additions and 37 deletions

View File

@ -49,7 +49,7 @@ public class SmokeTestUtil {
@Override
public void init(final ProcessorContext context) {
super.init(context);
System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
}

View File

@ -83,7 +83,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
System.out.println("[0.10.0] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}

View File

@ -86,7 +86,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
System.out.println("[0.10.1] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}

View File

@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
System.out.println("[0.10.2] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}

View File

@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
System.out.println("[0.11.0] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}

View File

@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
System.out.println("[1.0] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}

View File

@ -80,7 +80,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
System.out.println("[1.1] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}

View File

@ -77,7 +77,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
System.out.println("[2.0] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}

View File

@ -77,7 +77,7 @@ public class StreamsUpgradeTest {
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=data taskId=" + context.taskId());
System.out.println("[2.1] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}

View File

@ -159,7 +159,7 @@ class StreamsUpgradeTest(Test):
processor.start()
monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node))
err_msg="Never saw output '%s' on " % self.processed_msg + str(processor.node))
connected_message = "Discovered group coordinator"
with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
@ -382,9 +382,9 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
monitor.wait_until("processed [0-9]* records from topic",
monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
# start second with <version>
self.prepare_for(self.processor2, version)
@ -395,15 +395,16 @@ class StreamsUpgradeTest(Test):
self.processor2.start()
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account))
first_monitor.wait_until("processed [0-9]* records from topic",
err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account))
first_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
second_monitor.wait_until("processed [0-9]* records from topic",
err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
second_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
# start third with <version>
# start third with <version>
self.prepare_for(self.processor3, version)
node3 = self.processor3.node
with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
@ -413,16 +414,17 @@ class StreamsUpgradeTest(Test):
self.processor3.start()
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account))
first_monitor.wait_until("processed [0-9]* records from topic",
err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account))
first_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
second_monitor.wait_until("processed [0-9]* records from topic",
err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
second_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
third_monitor.wait_until("processed [0-9]* records from topic",
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))
err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
third_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account))
@staticmethod
def prepare_for(processor, version):
@ -452,12 +454,12 @@ class StreamsUpgradeTest(Test):
with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
processor.stop()
first_other_monitor.wait_until("processed 100 records from topic",
first_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
second_other_monitor.wait_until("processed 100 records from topic",
err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
second_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
if upgrade_from is None: # upgrade disabled -- second round of rolling bounces
@ -484,24 +486,25 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + new_version + " " + str(node.account))
first_other_monitor.wait_until("processed 100 records from topic",
err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account))
first_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
second_other_monitor.wait_until("processed 100 records from topic",
second_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
monitor.wait_until("processed 100 records from topic",
monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account))
err_msg="Never saw output '%s' on " % self.processed_msg + str(node.account))
def do_rolling_bounce(self, processor, counter, current_generation):
first_other_processor = None