MINOR: Fix Streams EOS system tests (#4572)

Avoid loosing log/stdout/stderr files on restart
Reenables tests

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2018-02-16 13:13:13 -08:00 committed by GitHub
parent 7303f41dc1
commit 0b3b6049f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 10 additions and 8 deletions

View File

@ -154,12 +154,18 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
class StreamsEosTestBaseService(StreamsTestBaseService):
"""Base class for Streams EOS Test services providing some common settings and functionality"""
clean_node_enabled = True
def __init__(self, test_context, kafka, command):
super(StreamsEosTestBaseService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsEosTest",
command)
def clean_node(self, node):
if self.clean_node_enabled:
super.clean_node(self, node)
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
@ -180,12 +186,10 @@ class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex")
class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")

View File

@ -20,7 +20,6 @@ from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
class StreamsEosTest(KafkaTest):
"""
Test of Kafka Streams exactly-once semantics
@ -39,7 +38,6 @@ class StreamsEosTest(KafkaTest):
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
self.test_context = test_context
@ignore
@cluster(num_nodes=9)
def test_rebalance_simple(self):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
@ -47,7 +45,6 @@ class StreamsEosTest(KafkaTest):
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@ignore
@cluster(num_nodes=9)
def test_rebalance_complex(self):
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
@ -63,6 +60,8 @@ class StreamsEosTest(KafkaTest):
self.driver.start()
processor1.clean_node_enabled = False
self.add_streams(processor1)
self.add_streams2(processor1, processor2)
self.add_streams3(processor1, processor2, processor3)
@ -79,7 +78,6 @@ class StreamsEosTest(KafkaTest):
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
@ignore
@cluster(num_nodes=9)
def test_failure_and_recovery(self):
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
@ -87,7 +85,6 @@ class StreamsEosTest(KafkaTest):
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@ignore
@cluster(num_nodes=9)
def test_failure_and_recovery_complex(self):
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
@ -103,6 +100,8 @@ class StreamsEosTest(KafkaTest):
self.driver.start()
processor1.clean_node_enabled = False
self.add_streams(processor1)
self.add_streams2(processor1, processor2)
self.add_streams3(processor1, processor2, processor3)
@ -159,7 +158,6 @@ class StreamsEosTest(KafkaTest):
self.wait_for_startup(monitor1, keep_alive_processor1)
def wait_for_startup(self, monitor, processor):
self.wait_for(monitor, processor, "StateChange: RUNNING -> REBALANCING")
self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
self.wait_for(monitor, processor, "processed 500 records from topic=data")