diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index bf9ada9af1f..cd8c8f91173 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -24,6 +24,7 @@ import time def file_exists(node, file): + """Quick and dirty check for existence of remote file.""" try: node.account.ssh("cat " + file, allow_fail=False) return True @@ -31,6 +32,15 @@ def file_exists(node, file): return False +def line_count(node, file): + """Return the line count of file on node""" + out = [line for line in node.account.ssh_capture("wc -l %s" % file)] + if len(out) != 1: + raise Exception("Expected single line of output from wc -l") + + return int(out[0].strip().split(" ")[0]) + + class ConsoleConsumerTest(Test): """Sanity checks on console consumer service class.""" def __init__(self, test_context): @@ -58,12 +68,10 @@ class ConsoleConsumerTest(Test): # Verify that log output is happening if not wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10): raise Exception("Timed out waiting for log file to exist") - consumer_log_lines = [line for line in node.account.ssh_capture("cat %s" % ConsoleConsumer.LOG_FILE)] - assert len(consumer_log_lines) > 0 + assert line_count(node, ConsoleConsumer.LOG_FILE) > 0 # Verify no consumed messages - consumed = [line for line in node.account.ssh_capture("cat %s" % ConsoleConsumer.STDOUT_CAPTURE)] - assert len(consumed) == 0 + assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0 self.consumer.stop_node(node) if not wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2):