mirror of https://github.com/apache/kafka.git
KAFKA-3673: Connect tests don't handle concurrent config changes
Author: Liquan Pei <liquanpei@gmail.com> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes #1340 from Ishiihara/connect-test-failure
This commit is contained in:
parent
d1bb2b9df1
commit
dbafc631ad
|
|
@ -22,6 +22,7 @@ import requests
|
|||
from ducktape.errors import DucktapeError
|
||||
from ducktape.services.service import Service
|
||||
from ducktape.utils.util import wait_until
|
||||
from kafkatest.utils.util import retry_on_exception
|
||||
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||
|
||||
|
|
@ -102,31 +103,30 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
|||
def config_filenames(self):
|
||||
return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])]
|
||||
|
||||
def list_connectors(self, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors', node=node, retries=retries, retry_backoff=retry_backoff)
|
||||
|
||||
def list_connectors(self, node=None):
|
||||
return self._rest('/connectors', node=node)
|
||||
|
||||
def create_connector(self, config, node=None):
|
||||
def create_connector(self, config, node=None, retries=0, retry_backoff=.01):
|
||||
create_request = {
|
||||
'name': config['name'],
|
||||
'config': config
|
||||
}
|
||||
return self._rest('/connectors', create_request, node=node, method="POST")
|
||||
return self._rest_with_retry('/connectors', create_request, node=node, method="POST", retries=retries, retry_backoff=retry_backoff)
|
||||
|
||||
def get_connector(self, name, node=None):
|
||||
return self._rest('/connectors/' + name, node=node)
|
||||
def get_connector(self, name, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors/' + name, node=node, retries=retries, retry_backoff=retry_backoff)
|
||||
|
||||
def get_connector_config(self, name, node=None):
|
||||
return self._rest('/connectors/' + name + '/config', node=node)
|
||||
def get_connector_config(self, name, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors/' + name + '/config', node=node, retries=retries, retry_backoff=retry_backoff)
|
||||
|
||||
def set_connector_config(self, name, config, node=None):
|
||||
return self._rest('/connectors/' + name + '/config', config, node=node, method="PUT")
|
||||
def set_connector_config(self, name, config, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors/' + name + '/config', config, node=node, method="PUT", retries=retries, retry_backoff=retry_backoff)
|
||||
|
||||
def get_connector_tasks(self, name, node=None):
|
||||
return self._rest('/connectors/' + name + '/tasks', node=node)
|
||||
def get_connector_tasks(self, name, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors/' + name + '/tasks', node=node, retries=retries, retry_backoff=retry_backoff)
|
||||
|
||||
def delete_connector(self, name, node=None):
|
||||
return self._rest('/connectors/' + name, node=node, method="DELETE")
|
||||
def delete_connector(self, name, node=None, retries=0, retry_backoff=.01):
|
||||
return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", retries=retries, retry_backoff=retry_backoff)
|
||||
|
||||
def _rest(self, path, body=None, node=None, method="GET"):
|
||||
if node is None:
|
||||
|
|
@ -144,10 +144,13 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
|||
else:
|
||||
return resp.json()
|
||||
|
||||
def _rest_with_retry(self, path, body=None, node=None, method="GET", retries=0, retry_backoff=.01):
|
||||
return retry_on_exception(lambda: self._rest(path, body, node, method), ConnectRestError, retries, retry_backoff)
|
||||
|
||||
def _base_url(self, node):
|
||||
return 'http://' + node.account.externally_routable_ip + ':' + '8083'
|
||||
|
||||
|
||||
class ConnectStandaloneService(ConnectServiceBase):
|
||||
"""Runs Kafka Connect in standalone mode."""
|
||||
|
||||
|
|
@ -223,8 +226,6 @@ class ConnectDistributedService(ConnectServiceBase):
|
|||
raise RuntimeError("No process ids recorded")
|
||||
|
||||
|
||||
|
||||
|
||||
class ConnectRestError(RuntimeError):
|
||||
def __init__(self, status, msg, url):
|
||||
self.status = status
|
||||
|
|
@ -235,7 +236,6 @@ class ConnectRestError(RuntimeError):
|
|||
return "Kafka Connect REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message
|
||||
|
||||
|
||||
|
||||
class VerifiableConnector(object):
|
||||
def messages(self):
|
||||
"""
|
||||
|
|
@ -261,6 +261,7 @@ class VerifiableConnector(object):
|
|||
self.logger.info("Destroying connector %s %s", type(self).__name__, self.name)
|
||||
self.cc.delete_connector(self.name)
|
||||
|
||||
|
||||
class VerifiableSource(VerifiableConnector):
|
||||
"""
|
||||
Helper class for running a verifiable source connector on a Kafka Connect cluster and analyzing the output.
|
||||
|
|
@ -284,6 +285,7 @@ class VerifiableSource(VerifiableConnector):
|
|||
'throughput': self.throughput
|
||||
})
|
||||
|
||||
|
||||
class VerifiableSink(VerifiableConnector):
|
||||
"""
|
||||
Helper class for running a verifiable sink connector on a Kafka Connect cluster and analyzing the output.
|
||||
|
|
|
|||
|
|
@ -15,8 +15,12 @@
|
|||
|
||||
from kafkatest.tests.kafka_test import KafkaTest
|
||||
from kafkatest.services.connect import ConnectDistributedService, ConnectRestError
|
||||
from kafkatest.utils.util import retry_on_exception
|
||||
from ducktape.utils.util import wait_until
|
||||
import hashlib, subprocess, json, itertools
|
||||
import subprocess
|
||||
import json
|
||||
import itertools
|
||||
|
||||
|
||||
class ConnectRestApiTest(KafkaTest):
|
||||
"""
|
||||
|
|
@ -65,10 +69,10 @@ class ConnectRestApiTest(KafkaTest):
|
|||
sink_connector_props = self.render("connect-file-sink.properties")
|
||||
for connector_props in [source_connector_props, sink_connector_props]:
|
||||
connector_config = self._config_dict_from_props(connector_props)
|
||||
self.cc.create_connector(connector_config)
|
||||
self.cc.create_connector(connector_config, retries=120, retry_backoff=1)
|
||||
|
||||
# We should see the connectors appear
|
||||
wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]),
|
||||
wait_until(lambda: set(self.cc.list_connectors(retries=5, retry_backoff=1)) == set(["local-file-source", "local-file-sink"]),
|
||||
timeout_sec=10, err_msg="Connectors that were just created did not appear in connector listing")
|
||||
|
||||
# We'll only do very simple validation that the connectors and tasks really ran.
|
||||
|
|
@ -76,7 +80,6 @@ class ConnectRestApiTest(KafkaTest):
|
|||
node.account.ssh("echo -e -n " + repr(self.INPUTS) + " >> " + self.INPUT_FILE)
|
||||
wait_until(lambda: self.validate_output(self.INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
|
||||
|
||||
|
||||
# Trying to create the same connector again should cause an error
|
||||
try:
|
||||
self.cc.create_connector(self._config_dict_from_props(source_connector_props))
|
||||
|
|
@ -97,19 +100,18 @@ class ConnectRestApiTest(KafkaTest):
|
|||
expected_sink_info = {
|
||||
'name': 'local-file-sink',
|
||||
'config': self._config_dict_from_props(sink_connector_props),
|
||||
'tasks': [{ 'connector': 'local-file-sink', 'task': 0 }]
|
||||
'tasks': [{'connector': 'local-file-sink', 'task': 0 }]
|
||||
}
|
||||
sink_info = self.cc.get_connector("local-file-sink")
|
||||
assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info)
|
||||
sink_config = self.cc.get_connector_config("local-file-sink")
|
||||
assert expected_sink_info['config'] == sink_config, "Incorrect config: " + json.dumps(sink_config)
|
||||
|
||||
|
||||
# Validate that we can get info about tasks. This info should definitely be available now without waiting since
|
||||
# we've already seen data appear in files.
|
||||
# TODO: It would be nice to validate a complete listing, but that doesn't make sense for the file connectors
|
||||
expected_source_task_info = [{
|
||||
'id': { 'connector': 'local-file-source', 'task': 0 },
|
||||
'id': {'connector': 'local-file-source', 'task': 0},
|
||||
'config': {
|
||||
'task.class': 'org.apache.kafka.connect.file.FileStreamSourceTask',
|
||||
'file': self.INPUT_FILE,
|
||||
|
|
@ -119,7 +121,7 @@ class ConnectRestApiTest(KafkaTest):
|
|||
source_task_info = self.cc.get_connector_tasks("local-file-source")
|
||||
assert expected_source_task_info == source_task_info, "Incorrect info:" + json.dumps(source_task_info)
|
||||
expected_sink_task_info = [{
|
||||
'id': { 'connector': 'local-file-sink', 'task': 0 },
|
||||
'id': {'connector': 'local-file-sink', 'task': 0},
|
||||
'config': {
|
||||
'task.class': 'org.apache.kafka.connect.file.FileStreamSinkTask',
|
||||
'file': self.OUTPUT_FILE,
|
||||
|
|
@ -139,9 +141,9 @@ class ConnectRestApiTest(KafkaTest):
|
|||
node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2)
|
||||
wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
|
||||
|
||||
self.cc.delete_connector("local-file-source")
|
||||
self.cc.delete_connector("local-file-sink")
|
||||
wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
|
||||
self.cc.delete_connector("local-file-source", retries=120, retry_backoff=1)
|
||||
self.cc.delete_connector("local-file-sink", retries=120, retry_backoff=1)
|
||||
wait_until(lambda: len(self.cc.list_connectors(retries=5, retry_backoff=1)) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
|
||||
|
||||
def validate_output(self, input):
|
||||
input_set = set(input)
|
||||
|
|
@ -151,7 +153,6 @@ class ConnectRestApiTest(KafkaTest):
|
|||
]))
|
||||
return input_set == output_set
|
||||
|
||||
|
||||
def file_contents(self, node, file):
|
||||
try:
|
||||
# Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
from kafkatest import __version__ as __kafkatest_version__
|
||||
|
||||
import re
|
||||
import time
|
||||
|
||||
|
||||
def kafkatest_version():
|
||||
|
|
@ -71,3 +72,14 @@ def is_int_with_prefix(msg):
|
|||
raise Exception("Unexpected message format. Message should be of format: integer "
|
||||
"prefix dot integer value, but one of the two parts (before or after dot) "
|
||||
"are not integers. Message: %s" % (msg))
|
||||
|
||||
|
||||
def retry_on_exception(fun, exception, retries, retry_backoff=.01):
|
||||
exception_to_throw = None
|
||||
for i in range(0, retries + 1):
|
||||
try:
|
||||
return fun()
|
||||
except exception as e:
|
||||
exception_to_throw = e
|
||||
time.sleep(retry_backoff)
|
||||
raise exception_to_throw
|
||||
|
|
|
|||
Loading…
Reference in New Issue