diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index afb885d7c99..9240f322c79 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -46,7 +46,7 @@ class ProduceConsumeValidateTest(Test): def setup_producer_and_consumer(self): raise NotImplementedError("Subclasses should implement this") - def start_producer_and_consumer(self, async=False): + def start_producer_and_consumer(self): # Start background producer and consumer self.consumer.start() if (self.consumer_init_timeout_sec > 0): @@ -58,11 +58,11 @@ class ProduceConsumeValidateTest(Test): self.consumer_init_timeout_sec) self.producer.start() - wait_until(lambda: async or self.producer.num_acked > 5, + wait_until(lambda: self.producer.num_acked > 5, timeout_sec=self.producer_start_timeout_sec, err_msg="Producer failed to produce messages for %ds." %\ self.producer_start_timeout_sec) - wait_until(lambda: async or len(self.consumer.messages_consumed[1]) > 0, + wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=self.consumer_start_timeout_sec, err_msg="Consumer failed to consume messages for %ds." %\ self.consumer_start_timeout_sec) @@ -89,10 +89,10 @@ class ProduceConsumeValidateTest(Test): self.producer.stop() self.consumer.wait() - def run_produce_consume_validate(self, async=False, core_test_action=None, *args): + def run_produce_consume_validate(self, core_test_action=None, *args): """Top-level template for simple produce/consume/validate tests.""" try: - self.start_producer_and_consumer(async) + self.start_producer_and_consumer() if core_test_action is not None: core_test_action(*args)