KAFKA-10286: Connect system tests should wait for workers to join group (#9040)

Currently, the system tests `connect_distributed_test` and `connect_rest_test` only wait for the REST api to come up.
The startup of the worker includes an asynchronous process for joining the worker group and syncing with other workers.
There are some situations in which this sync takes an unusually long time, and the test continues without all workers up.
This leads to flakey test failures, as worker joins are not given sufficient time to timeout and retry without waiting explicitly.

This changes the `ConnectDistributedTest` to wait for the Joined group message to be printed to the logs before continuing with tests. I've activated this behavior by default, as it's a superset of the checks that were performed by default before.

This log message is present in every version of DistributedHerder that I could find, in slightly different forms, but always with `Joined group` at the beginning of the log message. This change should be safe to backport to any branch.

Signed-off-by: Greg Harris <gregh@confluent.io>
Author: Greg Harris <gregh@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Greg Harris 2020-07-20 06:48:02 -07:00 committed by GitHub
parent b02fa53419
commit 5a2a7c6348
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 4 deletions

View File

@ -44,13 +44,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
CONNECT_REST_PORT = 8083
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "connect_heap_dump.bin")
# Currently the Connect worker supports waiting on three modes:
# Currently the Connect worker supports waiting on four modes:
STARTUP_MODE_INSTANT = 'INSTANT'
"""STARTUP_MODE_INSTANT: Start Connect worker and return immediately"""
STARTUP_MODE_LOAD = 'LOAD'
"""STARTUP_MODE_LOAD: Start Connect worker and return after discovering and loading plugins"""
STARTUP_MODE_LISTEN = 'LISTEN'
"""STARTUP_MODE_LISTEN: Start Connect worker and return after opening the REST port."""
STARTUP_MODE_JOIN = 'JOIN'
"""STARTUP_MODE_JOIN: Start Connect worker and return after joining the group."""
logs = {
"connect_log": {
@ -115,8 +117,9 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
self.logger.debug("REST resources are not loaded yet")
return False
def start(self, mode=STARTUP_MODE_LISTEN):
self.startup_mode = mode
def start(self, mode=None):
if mode:
self.startup_mode = mode
super(ConnectServiceBase, self).start()
def start_and_return_immediately(self, node, worker_type, remote_connector_configs):
@ -137,6 +140,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
err_msg="Kafka Connect failed to start on node: %s in condition mode: %s" %
(str(node.account), self.startup_mode))
def start_and_wait_to_join_group(self, node, worker_type, remote_connector_configs):
if worker_type != 'distributed':
raise RuntimeError("Cannot wait for joined group message for %s" % worker_type)
with node.account.monitor_log(self.LOG_FILE) as monitor:
self.start_and_return_immediately(node, worker_type, remote_connector_configs)
monitor.wait_until('Joined group', timeout_sec=self.startup_timeout_sec,
err_msg="Never saw message indicating Kafka Connect joined group on node: " +
"%s in condition mode: %s" % (str(node.account), self.startup_mode))
def stop_node(self, node, clean_shutdown=True):
self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account))
pids = self.pids(node)
@ -310,6 +322,8 @@ class ConnectStandaloneService(ConnectServiceBase):
self.start_and_wait_to_load_plugins(node, 'standalone', remote_connector_configs)
elif self.startup_mode == self.STARTUP_MODE_INSTANT:
self.start_and_return_immediately(node, 'standalone', remote_connector_configs)
elif self.startup_mode == self.STARTUP_MODE_JOIN:
self.start_and_wait_to_join_group(node, 'standalone', remote_connector_configs)
else:
# The default mode is to wait until the complete startup of the worker
self.start_and_wait_to_start_listening(node, 'standalone', remote_connector_configs)
@ -324,6 +338,7 @@ class ConnectDistributedService(ConnectServiceBase):
def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
self.startup_mode = self.STARTUP_MODE_JOIN
self.offsets_topic = offsets_topic
self.configs_topic = configs_topic
self.status_topic = status_topic
@ -357,9 +372,11 @@ class ConnectDistributedService(ConnectServiceBase):
self.start_and_wait_to_load_plugins(node, 'distributed', '')
elif self.startup_mode == self.STARTUP_MODE_INSTANT:
self.start_and_return_immediately(node, 'distributed', '')
elif self.startup_mode == self.STARTUP_MODE_LISTEN:
self.start_and_wait_to_start_listening(node, 'distributed', '')
else:
# The default mode is to wait until the complete startup of the worker
self.start_and_wait_to_start_listening(node, 'distributed', '')
self.start_and_wait_to_join_group(node, 'distributed', '')
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")