diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index 7f36854f9a7..ebc19b00f44 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -17,12 +17,12 @@ import json import os.path import random import signal +import time import requests from ducktape.errors import DucktapeError from ducktape.services.service import Service from ducktape.utils.util import wait_until -from kafkatest.utils.util import retry_on_exception from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin @@ -107,45 +107,49 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): def config_filenames(self): return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])] - def list_connectors(self, node=None, retries=0, retry_backoff=.01): - return self._rest_with_retry('/connectors', node=node, retries=retries, retry_backoff=retry_backoff) + def list_connectors(self, node=None, **kwargs): + return self._rest_with_retry('/connectors', node=node, **kwargs) - def create_connector(self, config, node=None, retries=0, retry_backoff=.01): + def create_connector(self, config, node=None, **kwargs): create_request = { 'name': config['name'], 'config': config } - return self._rest_with_retry('/connectors', create_request, node=node, method="POST", retries=retries, retry_backoff=retry_backoff) + return self._rest_with_retry('/connectors', create_request, node=node, method="POST", **kwargs) - def get_connector(self, name, node=None, retries=0, retry_backoff=.01): - return self._rest_with_retry('/connectors/' + name, node=node, retries=retries, retry_backoff=retry_backoff) + def get_connector(self, name, node=None, **kwargs): + return self._rest_with_retry('/connectors/' + name, node=node, **kwargs) - def get_connector_config(self, name, node=None, retries=0, retry_backoff=.01): - return self._rest_with_retry('/connectors/' + name + '/config', node=node, retries=retries, retry_backoff=retry_backoff) + def get_connector_config(self, name, node=None, **kwargs): + return self._rest_with_retry('/connectors/' + name + '/config', node=node, **kwargs) - def set_connector_config(self, name, config, node=None, retries=0, retry_backoff=.01): - return self._rest_with_retry('/connectors/' + name + '/config', config, node=node, method="PUT", retries=retries, retry_backoff=retry_backoff) + def set_connector_config(self, name, config, node=None, **kwargs): + # Unlike many other calls, a 409 when setting a connector config is expected if the connector already exists. + # However, we also might see 409s for other reasons (e.g. rebalancing). So we still perform retries at the cost + # of tests possibly taking longer to ultimately fail. Tests that care about this can explicitly override the + # number of retries. + return self._rest_with_retry('/connectors/' + name + '/config', config, node=node, method="PUT", **kwargs) - def get_connector_tasks(self, name, node=None, retries=0, retry_backoff=.01): - return self._rest_with_retry('/connectors/' + name + '/tasks', node=node, retries=retries, retry_backoff=retry_backoff) + def get_connector_tasks(self, name, node=None, **kwargs): + return self._rest_with_retry('/connectors/' + name + '/tasks', node=node, **kwargs) - def delete_connector(self, name, node=None, retries=0, retry_backoff=.01): - return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", retries=retries, retry_backoff=retry_backoff) + def delete_connector(self, name, node=None, **kwargs): + return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", **kwargs) def get_connector_status(self, name, node=None): return self._rest('/connectors/' + name + '/status', node=node) - def restart_connector(self, name, node=None): - return self._rest('/connectors/' + name + '/restart', method="POST") + def restart_connector(self, name, node=None, **kwargs): + return self._rest_with_retry('/connectors/' + name + '/restart', node=node, method="POST", **kwargs) def restart_task(self, connector_name, task_id, node=None): - return self._rest('/connectors/' + connector_name + '/tasks/' + str(task_id) + '/restart', method="POST") + return self._rest('/connectors/' + connector_name + '/tasks/' + str(task_id) + '/restart', node=node, method="POST") def pause_connector(self, name, node=None): - return self._rest('/connectors/' + name + '/pause', method="PUT") + return self._rest('/connectors/' + name + '/pause', node=node, method="PUT") def resume_connector(self, name, node=None): - return self._rest('/connectors/' + name + '/resume', method="PUT") + return self._rest('/connectors/' + name + '/resume', node=node, method="PUT") def list_connector_plugins(self, node=None): return self._rest('/connector-plugins/', node=node) @@ -163,14 +167,28 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service): resp = meth(url, json=body) self.logger.debug("%s %s response: %d", url, method, resp.status_code) if resp.status_code > 400: + self.logger.debug("Connect REST API error for %s: %d %s", resp.url, resp.status_code, resp.text) raise ConnectRestError(resp.status_code, resp.text, resp.url) if resp.status_code == 204 or resp.status_code == 202: return None else: return resp.json() - def _rest_with_retry(self, path, body=None, node=None, method="GET", retries=0, retry_backoff=.01): - return retry_on_exception(lambda: self._rest(path, body, node, method), ConnectRestError, retries, retry_backoff) + def _rest_with_retry(self, path, body=None, node=None, method="GET", retries=40, retry_backoff=.25): + """ + Invokes a REST API with retries for errors that may occur during normal operation (notably 409 CONFLICT + responses that can occur due to rebalancing). + """ + exception_to_throw = None + for i in range(0, retries + 1): + try: + return self._rest(path, body, node, method) + except ConnectRestError as e: + exception_to_throw = e + if e.status != 409: + break + time.sleep(retry_backoff) + raise exception_to_throw def _base_url(self, node): return 'http://' + node.account.externally_routable_ip + ':' + '8083' diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index c32b8e179c9..0b004996caf 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -84,11 +84,11 @@ class ConnectRestApiTest(KafkaTest): self.verify_config(self.FILE_SINK_CONNECTOR, self.FILE_SINK_CONFIGS, configs) self.logger.info("Creating connectors") - self.cc.create_connector(source_connector_config, retries=120, retry_backoff=1) - self.cc.create_connector(sink_connector_config, retries=120, retry_backoff=1) + self.cc.create_connector(source_connector_config) + self.cc.create_connector(sink_connector_config) # We should see the connectors appear - wait_until(lambda: set(self.cc.list_connectors(retries=5, retry_backoff=1)) == set(["local-file-source", "local-file-sink"]), + wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]), timeout_sec=10, err_msg="Connectors that were just created did not appear in connector listing") # We'll only do very simple validation that the connectors and tasks really ran. @@ -157,9 +157,9 @@ class ConnectRestApiTest(KafkaTest): node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2) wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") - self.cc.delete_connector("local-file-source", retries=120, retry_backoff=1) - self.cc.delete_connector("local-file-sink", retries=120, retry_backoff=1) - wait_until(lambda: len(self.cc.list_connectors(retries=5, retry_backoff=1)) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing") + self.cc.delete_connector("local-file-source") + self.cc.delete_connector("local-file-sink") + wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing") def validate_output(self, input): input_set = set(input) diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py index c043bec743f..f004ece25d7 100644 --- a/tests/kafkatest/utils/util.py +++ b/tests/kafkatest/utils/util.py @@ -73,13 +73,3 @@ def is_int_with_prefix(msg): "prefix dot integer value, but one of the two parts (before or after dot) " "are not integers. Message: %s" % (msg)) - -def retry_on_exception(fun, exception, retries, retry_backoff=.01): - exception_to_throw = None - for i in range(0, retries + 1): - try: - return fun() - except exception as e: - exception_to_throw = e - time.sleep(retry_backoff) - raise exception_to_throw