KAFKA-3782: Fix transient failure in connect distributed bounce test

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1650 from hachikuji/KAFKA-3782
This commit is contained in:
Jason Gustafson 2016-07-21 20:09:03 -07:00 committed by Ewen Cheslack-Postava
parent f1b37eec74
commit f5df13627a
1 changed files with 9 additions and 6 deletions

View File

@ -329,7 +329,7 @@ class ConnectDistributedTest(Test):
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start() self.cc.start()
self.source = VerifiableSource(self.cc, tasks=num_tasks) self.source = VerifiableSource(self.cc, tasks=num_tasks, throughput=100)
self.source.start() self.source.start()
self.sink = VerifiableSink(self.cc, tasks=num_tasks) self.sink = VerifiableSink(self.cc, tasks=num_tasks)
self.sink.start() self.sink.start()
@ -344,10 +344,13 @@ class ConnectDistributedTest(Test):
monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90, monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90,
err_msg="Kafka Connect worker didn't successfully join group and start work") err_msg="Kafka Connect worker didn't successfully join group and start work")
self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started) self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started)
# If this is a hard bounce, give additional time for the consumer groups to recover. If we don't give
# some time here, the next bounce may cause consumers to be shut down before they have any time to process # Give additional time for the consumer groups to recover. Even if it is not a hard bounce, there are
# data and we can end up with zero data making it through the test. # some cases where a restart can cause a rebalance to take the full length of the session timeout
if not clean: # (e.g. if the client shuts down before it has received the memberId from its initial JoinGroup).
# If we don't give enough time for the group to stabilize, the next bounce may cause consumers to
# be shut down before they have any time to process data and we can end up with zero data making it
# through the test.
time.sleep(15) time.sleep(15)