diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index 6ee6aded19f..ba00e1014e0 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -438,10 +438,10 @@ class VerifiableSource(VerifiableConnector): self.throughput = throughput def committed_messages(self): - return filter(lambda m: 'committed' in m and m['committed'], self.messages()) + return list(filter(lambda m: 'committed' in m and m['committed'], self.messages())) def sent_messages(self): - return filter(lambda m: 'committed' not in m or not m['committed'], self.messages()) + return list(filter(lambda m: 'committed' not in m or not m['committed'], self.messages())) def start(self): self.logger.info("Creating connector VerifiableSourceConnector %s", self.name) @@ -467,10 +467,10 @@ class VerifiableSink(VerifiableConnector): self.topics = topics def flushed_messages(self): - return filter(lambda m: 'flushed' in m and m['flushed'], self.messages()) + return list(filter(lambda m: 'flushed' in m and m['flushed'], self.messages())) def received_messages(self): - return filter(lambda m: 'flushed' not in m or not m['flushed'], self.messages()) + return list(filter(lambda m: 'flushed' not in m or not m['flushed'], self.messages())) def start(self): self.logger.info("Creating connector VerifiableSinkConnector %s", self.name) diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 684cbc6da56..c661dfb285b 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -228,9 +228,9 @@ class ConnectDistributedTest(Test): err_msg="Failed to see connector transition to the PAUSED state") # verify that we do not produce new messages while paused - num_messages = len(list(self.source.sent_messages())) + num_messages = len(self.source.sent_messages()) time.sleep(10) - assert num_messages == len(list(self.source.sent_messages())), "Paused source connector should not produce any messages" + assert num_messages == len(self.source.sent_messages()), "Paused source connector should not produce any messages" self.cc.resume_connector(self.source.name) @@ -239,7 +239,7 @@ class ConnectDistributedTest(Test): err_msg="Failed to see connector transition to the RUNNING state") # after resuming, we should see records produced again - wait_until(lambda: len(list(self.source.sent_messages())) > num_messages, timeout_sec=30, + wait_until(lambda: len(self.source.sent_messages()) > num_messages, timeout_sec=30, err_msg="Failed to produce messages after resuming source connector") @cluster(num_nodes=5) @@ -259,7 +259,7 @@ class ConnectDistributedTest(Test): self.source = VerifiableSource(self.cc, topic=self.TOPIC) self.source.start() - wait_until(lambda: len(list(self.source.committed_messages())) > 0, timeout_sec=30, + wait_until(lambda: len(self.source.committed_messages()) > 0, timeout_sec=30, err_msg="Timeout expired waiting for source task to produce a message") self.sink = VerifiableSink(self.cc, topics=[self.TOPIC]) @@ -276,9 +276,9 @@ class ConnectDistributedTest(Test): err_msg="Failed to see connector transition to the PAUSED state") # verify that we do not consume new messages while paused - num_messages = len(list(self.sink.received_messages())) + num_messages = len(self.sink.received_messages()) time.sleep(10) - assert num_messages == len(list(self.sink.received_messages())), "Paused sink connector should not consume any messages" + assert num_messages == len(self.sink.received_messages()), "Paused sink connector should not consume any messages" self.cc.resume_connector(self.sink.name) @@ -287,7 +287,7 @@ class ConnectDistributedTest(Test): err_msg="Failed to see connector transition to the RUNNING state") # after resuming, we should see records consumed again - wait_until(lambda: len(list(self.sink.received_messages())) > num_messages, timeout_sec=30, + wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30, err_msg="Failed to consume messages after resuming sink connector") @cluster(num_nodes=5)