diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py b/tests/kafkatest/tests/streams/streams_optimized_test.py index adea2ea5ce8..3209b253696 100644 --- a/tests/kafkatest/tests/streams/streams_optimized_test.py +++ b/tests/kafkatest/tests/streams/streams_optimized_test.py @@ -15,6 +15,7 @@ import time from ducktape.tests.test import Test +from ducktape.utils.util import wait_until from kafkatest.services.kafka import KafkaService from kafkatest.services.streams import StreamsOptimizedUpgradeTestService from kafkatest.services.streams import StreamsResetter @@ -65,30 +66,41 @@ class StreamsOptimizedTest(Test): processors = [processor1, processor2, processor3] - # produce records continually during the test + self.logger.info("produce records continually during the test") self.producer.start() - # start all processors unoptimized + self.logger.info("start all processors unoptimized") for processor in processors: self.set_topics(processor) processor.CLEAN_NODE_ENABLED = False self.verify_running_repartition_topic_count(processor, 4) + self.logger.info("verify unoptimized") self.verify_processing(processors, verify_individual_operations=False) + self.logger.info("stop unoptimized") stop_processors(processors, self.stopped_message) + self.logger.info("reset") self.reset_application() + for processor in processors: + processor.node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + ".1", allow_fail=False) + processor.node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + ".1", allow_fail=False) + processor.node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + ".1", allow_fail=False) + processor.node.account.ssh("mv " + processor.CONFIG_FILE + " " + processor.CONFIG_FILE + ".1", allow_fail=False) - # start again with topology optimized + self.logger.info("start again with topology optimized") for processor in processors: processor.OPTIMIZED_CONFIG = 'all' self.verify_running_repartition_topic_count(processor, 1) + self.logger.info("verify optimized") self.verify_processing(processors, verify_individual_operations=True) + self.logger.info("stop optimized") stop_processors(processors, self.stopped_message) + self.logger.info("teardown") self.producer.stop() self.kafka.stop() self.zookeeper.stop() @@ -110,34 +122,25 @@ class StreamsOptimizedTest(Test): % repartition_topic_count + str(processor.node.account)) def verify_processing(self, processors, verify_individual_operations): + # This test previously had logic to account for skewed assignments, in which not all processors may + # receive active assignments. I don't think this will happen anymore, but keep an eye out if we see + # test failures here. If that does resurface, note that the prior implementation was not correct. + # A better approach would be to make sure we see processing of each partition across the whole cluster + # instead of just expecting to see each node perform some processing. for processor in processors: - if not self.all_source_subtopology_tasks(processor): - if verify_individual_operations: - for operation in self.operation_pattern.split('\|'): - self.do_verify(processor, operation) - else: - self.do_verify(processor, self.operation_pattern) + if verify_individual_operations: + for operation in self.operation_pattern.split('\|'): + self.do_verify(processor, operation) else: - self.logger.info("Skipping processor %s with all source tasks" % processor.node.account) + self.do_verify(processor, self.operation_pattern) def do_verify(self, processor, pattern): self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % pattern) - with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: - monitor.wait_until(pattern, - timeout_sec=60, - err_msg="Never saw processing of %s " % pattern + str(processor.node.account)) - - def all_source_subtopology_tasks(self, processor): - retries = 0 - while retries < 5: - found = list(processor.node.account.ssh_capture("sed -n 's/.*current active tasks: \[\(\(0_[0-9], \)\{3\}0_[0-9]\)\].*/\1/p' %s" % processor.LOG_FILE, allow_fail=True)) - self.logger.info("Returned %s from assigned task check" % found) - if len(found) > 0: - return True - retries += 1 - time.sleep(1) - - return False + self.logger.info(list(processor.node.account.ssh_capture("ls -lh %s" % (processor.STDOUT_FILE), allow_fail=True))) + wait_until( + lambda: processor.node.account.ssh("grep --max-count 1 '%s' %s" % (pattern, processor.STDOUT_FILE), allow_fail=True) == 0, + timeout_sec=60 + ) def set_topics(self, processor): processor.INPUT_TOPIC = self.input_topic