mirror of https://github.com/apache/kafka.git
KAFKA-4037: Make Connect REST API retries aware of 409 CONFLICT errors
Author: Ewen Cheslack-Postava <me@ewencp.org> Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes #1733 from ewencp/rest-api-retries
This commit is contained in:
parent
9af2e69ef4
commit
59cfa84801
|
@ -17,12 +17,12 @@ import json
|
|||
import os.path
|
||||
import random
|
||||
import signal
|
||||
import time
|
||||
|
||||
import requests
|
||||
from ducktape.errors import DucktapeError
|
||||
from ducktape.services.service import Service
|
||||
from ducktape.utils.util import wait_until
|
||||
from kafkatest.utils.util import retry_on_exception
|
||||
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||
|
||||
|
@ -107,45 +107,49 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
|||
def config_filenames(self):
|
||||
return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])]
|
||||
|
||||
def list_connectors(self, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors', node=node, retries=retries, retry_backoff=retry_backoff)
|
||||
def list_connectors(self, node=None, **kwargs):
|
||||
return self._rest_with_retry('/connectors', node=node, **kwargs)
|
||||
|
||||
def create_connector(self, config, node=None, retries=0, retry_backoff=.01):
|
||||
def create_connector(self, config, node=None, **kwargs):
|
||||
create_request = {
|
||||
'name': config['name'],
|
||||
'config': config
|
||||
}
|
||||
return self._rest_with_retry('/connectors', create_request, node=node, method="POST", retries=retries, retry_backoff=retry_backoff)
|
||||
return self._rest_with_retry('/connectors', create_request, node=node, method="POST", **kwargs)
|
||||
|
||||
def get_connector(self, name, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors/' + name, node=node, retries=retries, retry_backoff=retry_backoff)
|
||||
def get_connector(self, name, node=None, **kwargs):
|
||||
return self._rest_with_retry('/connectors/' + name, node=node, **kwargs)
|
||||
|
||||
def get_connector_config(self, name, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors/' + name + '/config', node=node, retries=retries, retry_backoff=retry_backoff)
|
||||
def get_connector_config(self, name, node=None, **kwargs):
|
||||
return self._rest_with_retry('/connectors/' + name + '/config', node=node, **kwargs)
|
||||
|
||||
def set_connector_config(self, name, config, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors/' + name + '/config', config, node=node, method="PUT", retries=retries, retry_backoff=retry_backoff)
|
||||
def set_connector_config(self, name, config, node=None, **kwargs):
|
||||
# Unlike many other calls, a 409 when setting a connector config is expected if the connector already exists.
|
||||
# However, we also might see 409s for other reasons (e.g. rebalancing). So we still perform retries at the cost
|
||||
# of tests possibly taking longer to ultimately fail. Tests that care about this can explicitly override the
|
||||
# number of retries.
|
||||
return self._rest_with_retry('/connectors/' + name + '/config', config, node=node, method="PUT", **kwargs)
|
||||
|
||||
def get_connector_tasks(self, name, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors/' + name + '/tasks', node=node, retries=retries, retry_backoff=retry_backoff)
|
||||
def get_connector_tasks(self, name, node=None, **kwargs):
|
||||
return self._rest_with_retry('/connectors/' + name + '/tasks', node=node, **kwargs)
|
||||
|
||||
def delete_connector(self, name, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", retries=retries, retry_backoff=retry_backoff)
|
||||
def delete_connector(self, name, node=None, **kwargs):
|
||||
return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", **kwargs)
|
||||
|
||||
def get_connector_status(self, name, node=None):
|
||||
return self._rest('/connectors/' + name + '/status', node=node)
|
||||
|
||||
def restart_connector(self, name, node=None):
|
||||
return self._rest('/connectors/' + name + '/restart', method="POST")
|
||||
def restart_connector(self, name, node=None, **kwargs):
|
||||
return self._rest_with_retry('/connectors/' + name + '/restart', node=node, method="POST", **kwargs)
|
||||
|
||||
def restart_task(self, connector_name, task_id, node=None):
|
||||
return self._rest('/connectors/' + connector_name + '/tasks/' + str(task_id) + '/restart', method="POST")
|
||||
return self._rest('/connectors/' + connector_name + '/tasks/' + str(task_id) + '/restart', node=node, method="POST")
|
||||
|
||||
def pause_connector(self, name, node=None):
|
||||
return self._rest('/connectors/' + name + '/pause', method="PUT")
|
||||
return self._rest('/connectors/' + name + '/pause', node=node, method="PUT")
|
||||
|
||||
def resume_connector(self, name, node=None):
|
||||
return self._rest('/connectors/' + name + '/resume', method="PUT")
|
||||
return self._rest('/connectors/' + name + '/resume', node=node, method="PUT")
|
||||
|
||||
def list_connector_plugins(self, node=None):
|
||||
return self._rest('/connector-plugins/', node=node)
|
||||
|
@ -163,14 +167,28 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
|||
resp = meth(url, json=body)
|
||||
self.logger.debug("%s %s response: %d", url, method, resp.status_code)
|
||||
if resp.status_code > 400:
|
||||
self.logger.debug("Connect REST API error for %s: %d %s", resp.url, resp.status_code, resp.text)
|
||||
raise ConnectRestError(resp.status_code, resp.text, resp.url)
|
||||
if resp.status_code == 204 or resp.status_code == 202:
|
||||
return None
|
||||
else:
|
||||
return resp.json()
|
||||
|
||||
def _rest_with_retry(self, path, body=None, node=None, method="GET", retries=0, retry_backoff=.01):
|
||||
return retry_on_exception(lambda: self._rest(path, body, node, method), ConnectRestError, retries, retry_backoff)
|
||||
def _rest_with_retry(self, path, body=None, node=None, method="GET", retries=40, retry_backoff=.25):
|
||||
"""
|
||||
Invokes a REST API with retries for errors that may occur during normal operation (notably 409 CONFLICT
|
||||
responses that can occur due to rebalancing).
|
||||
"""
|
||||
exception_to_throw = None
|
||||
for i in range(0, retries + 1):
|
||||
try:
|
||||
return self._rest(path, body, node, method)
|
||||
except ConnectRestError as e:
|
||||
exception_to_throw = e
|
||||
if e.status != 409:
|
||||
break
|
||||
time.sleep(retry_backoff)
|
||||
raise exception_to_throw
|
||||
|
||||
def _base_url(self, node):
|
||||
return 'http://' + node.account.externally_routable_ip + ':' + '8083'
|
||||
|
|
|
@ -84,11 +84,11 @@ class ConnectRestApiTest(KafkaTest):
|
|||
self.verify_config(self.FILE_SINK_CONNECTOR, self.FILE_SINK_CONFIGS, configs)
|
||||
|
||||
self.logger.info("Creating connectors")
|
||||
self.cc.create_connector(source_connector_config, retries=120, retry_backoff=1)
|
||||
self.cc.create_connector(sink_connector_config, retries=120, retry_backoff=1)
|
||||
self.cc.create_connector(source_connector_config)
|
||||
self.cc.create_connector(sink_connector_config)
|
||||
|
||||
# We should see the connectors appear
|
||||
wait_until(lambda: set(self.cc.list_connectors(retries=5, retry_backoff=1)) == set(["local-file-source", "local-file-sink"]),
|
||||
wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]),
|
||||
timeout_sec=10, err_msg="Connectors that were just created did not appear in connector listing")
|
||||
|
||||
# We'll only do very simple validation that the connectors and tasks really ran.
|
||||
|
@ -157,9 +157,9 @@ class ConnectRestApiTest(KafkaTest):
|
|||
node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2)
|
||||
wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
|
||||
|
||||
self.cc.delete_connector("local-file-source", retries=120, retry_backoff=1)
|
||||
self.cc.delete_connector("local-file-sink", retries=120, retry_backoff=1)
|
||||
wait_until(lambda: len(self.cc.list_connectors(retries=5, retry_backoff=1)) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
|
||||
self.cc.delete_connector("local-file-source")
|
||||
self.cc.delete_connector("local-file-sink")
|
||||
wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
|
||||
|
||||
def validate_output(self, input):
|
||||
input_set = set(input)
|
||||
|
|
|
@ -73,13 +73,3 @@ def is_int_with_prefix(msg):
|
|||
"prefix dot integer value, but one of the two parts (before or after dot) "
|
||||
"are not integers. Message: %s" % (msg))
|
||||
|
||||
|
||||
def retry_on_exception(fun, exception, retries, retry_backoff=.01):
|
||||
exception_to_throw = None
|
||||
for i in range(0, retries + 1):
|
||||
try:
|
||||
return fun()
|
||||
except exception as e:
|
||||
exception_to_throw = e
|
||||
time.sleep(retry_backoff)
|
||||
raise exception_to_throw
|
||||
|
|
Loading…
Reference in New Issue