KAFKA-10289; Fix failed connect_distributed_test.py (ConnectDistributedTest.test_bounce) (#9673)

In Python 3, `filter` functions return iterators rather than `list` so it can traverse only once. Hence, the following loop will only see "empty" and then validation fails.

```python
        src_messages = self.source.committed_messages() # return iterator
        sink_messages = self.sink.flushed_messages()) # return iterator
        for task in range(num_tasks):
            # only first task can "see" the result. following tasks see empty result
            src_seqnos = [msg['seqno'] for msg in src_messages if msg['task'] == task]
```

Reference: https://portingguide.readthedocs.io/en/latest/iterators.html#new-behavior-of-map-and-filter.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Chia-Ping Tsai 2020-12-10 05:38:17 +08:00 committed by GitHub
parent a8b668b37c
commit 6e15937feb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 11 deletions

View File

@ -438,10 +438,10 @@ class VerifiableSource(VerifiableConnector):
self.throughput = throughput
def committed_messages(self):
return filter(lambda m: 'committed' in m and m['committed'], self.messages())
return list(filter(lambda m: 'committed' in m and m['committed'], self.messages()))
def sent_messages(self):
return filter(lambda m: 'committed' not in m or not m['committed'], self.messages())
return list(filter(lambda m: 'committed' not in m or not m['committed'], self.messages()))
def start(self):
self.logger.info("Creating connector VerifiableSourceConnector %s", self.name)
@ -467,10 +467,10 @@ class VerifiableSink(VerifiableConnector):
self.topics = topics
def flushed_messages(self):
return filter(lambda m: 'flushed' in m and m['flushed'], self.messages())
return list(filter(lambda m: 'flushed' in m and m['flushed'], self.messages()))
def received_messages(self):
return filter(lambda m: 'flushed' not in m or not m['flushed'], self.messages())
return list(filter(lambda m: 'flushed' not in m or not m['flushed'], self.messages()))
def start(self):
self.logger.info("Creating connector VerifiableSinkConnector %s", self.name)

View File

@ -228,9 +228,9 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see connector transition to the PAUSED state")
# verify that we do not produce new messages while paused
num_messages = len(list(self.source.sent_messages()))
num_messages = len(self.source.sent_messages())
time.sleep(10)
assert num_messages == len(list(self.source.sent_messages())), "Paused source connector should not produce any messages"
assert num_messages == len(self.source.sent_messages()), "Paused source connector should not produce any messages"
self.cc.resume_connector(self.source.name)
@ -239,7 +239,7 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see connector transition to the RUNNING state")
# after resuming, we should see records produced again
wait_until(lambda: len(list(self.source.sent_messages())) > num_messages, timeout_sec=30,
wait_until(lambda: len(self.source.sent_messages()) > num_messages, timeout_sec=30,
err_msg="Failed to produce messages after resuming source connector")
@cluster(num_nodes=5)
@ -259,7 +259,7 @@ class ConnectDistributedTest(Test):
self.source = VerifiableSource(self.cc, topic=self.TOPIC)
self.source.start()
wait_until(lambda: len(list(self.source.committed_messages())) > 0, timeout_sec=30,
wait_until(lambda: len(self.source.committed_messages()) > 0, timeout_sec=30,
err_msg="Timeout expired waiting for source task to produce a message")
self.sink = VerifiableSink(self.cc, topics=[self.TOPIC])
@ -276,9 +276,9 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see connector transition to the PAUSED state")
# verify that we do not consume new messages while paused
num_messages = len(list(self.sink.received_messages()))
num_messages = len(self.sink.received_messages())
time.sleep(10)
assert num_messages == len(list(self.sink.received_messages())), "Paused sink connector should not consume any messages"
assert num_messages == len(self.sink.received_messages()), "Paused sink connector should not consume any messages"
self.cc.resume_connector(self.sink.name)
@ -287,7 +287,7 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see connector transition to the RUNNING state")
# after resuming, we should see records consumed again
wait_until(lambda: len(list(self.sink.received_messages())) > num_messages, timeout_sec=30,
wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30,
err_msg="Failed to consume messages after resuming sink connector")
@cluster(num_nodes=5)