diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index b05635820cd..6ee6aded19f 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -44,13 +44,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): CONNECT_REST_PORT = 8083 HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "connect_heap_dump.bin") - # Currently the Connect worker supports waiting on three modes: + # Currently the Connect worker supports waiting on four modes: STARTUP_MODE_INSTANT = 'INSTANT' """STARTUP_MODE_INSTANT: Start Connect worker and return immediately""" STARTUP_MODE_LOAD = 'LOAD' """STARTUP_MODE_LOAD: Start Connect worker and return after discovering and loading plugins""" STARTUP_MODE_LISTEN = 'LISTEN' """STARTUP_MODE_LISTEN: Start Connect worker and return after opening the REST port.""" + STARTUP_MODE_JOIN = 'JOIN' + """STARTUP_MODE_JOIN: Start Connect worker and return after joining the group.""" logs = { "connect_log": { @@ -115,8 +117,9 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): self.logger.debug("REST resources are not loaded yet") return False - def start(self, mode=STARTUP_MODE_LISTEN): - self.startup_mode = mode + def start(self, mode=None): + if mode: + self.startup_mode = mode super(ConnectServiceBase, self).start() def start_and_return_immediately(self, node, worker_type, remote_connector_configs): @@ -137,6 +140,15 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): err_msg="Kafka Connect failed to start on node: %s in condition mode: %s" % (str(node.account), self.startup_mode)) + def start_and_wait_to_join_group(self, node, worker_type, remote_connector_configs): + if worker_type != 'distributed': + raise RuntimeError("Cannot wait for joined group message for %s" % worker_type) + with node.account.monitor_log(self.LOG_FILE) as monitor: + self.start_and_return_immediately(node, worker_type, remote_connector_configs) + monitor.wait_until('Joined group', timeout_sec=self.startup_timeout_sec, + err_msg="Never saw message indicating Kafka Connect joined group on node: " + + "%s in condition mode: %s" % (str(node.account), self.startup_mode)) + def stop_node(self, node, clean_shutdown=True): self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account)) pids = self.pids(node) @@ -310,6 +322,8 @@ class ConnectStandaloneService(ConnectServiceBase): self.start_and_wait_to_load_plugins(node, 'standalone', remote_connector_configs) elif self.startup_mode == self.STARTUP_MODE_INSTANT: self.start_and_return_immediately(node, 'standalone', remote_connector_configs) + elif self.startup_mode == self.STARTUP_MODE_JOIN: + self.start_and_wait_to_join_group(node, 'standalone', remote_connector_configs) else: # The default mode is to wait until the complete startup of the worker self.start_and_wait_to_start_listening(node, 'standalone', remote_connector_configs) @@ -324,6 +338,7 @@ class ConnectDistributedService(ConnectServiceBase): def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets", configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60): super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec) + self.startup_mode = self.STARTUP_MODE_JOIN self.offsets_topic = offsets_topic self.configs_topic = configs_topic self.status_topic = status_topic @@ -357,9 +372,11 @@ class ConnectDistributedService(ConnectServiceBase): self.start_and_wait_to_load_plugins(node, 'distributed', '') elif self.startup_mode == self.STARTUP_MODE_INSTANT: self.start_and_return_immediately(node, 'distributed', '') + elif self.startup_mode == self.STARTUP_MODE_LISTEN: + self.start_and_wait_to_start_listening(node, 'distributed', '') else: # The default mode is to wait until the complete startup of the worker - self.start_and_wait_to_start_listening(node, 'distributed', '') + self.start_and_wait_to_join_group(node, 'distributed', '') if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded")