mirror of https://github.com/apache/kafka.git
KAFKA-7225; Corrected system tests by generating external properties file (#5489)
Fix system tests from earlier #5445 by moving to the `ConnectSystemBase` class the creation & cleanup of a file that can be used as externalized secrets in connector configs. Reviewers: Arjun Satish <arjun@confluent.io>, Robert Yokota <rayokota@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
c968267bf1
commit
e577f6d366
|
|
@ -40,6 +40,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
|||
STDERR_FILE = os.path.join(PERSISTENT_ROOT, "connect.stderr")
|
||||
LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "connect-log4j.properties")
|
||||
PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid")
|
||||
EXTERNAL_CONFIGS_FILE = os.path.join(PERSISTENT_ROOT, "connect-external-configs.properties")
|
||||
CONNECT_REST_PORT = 8083
|
||||
|
||||
# Currently the Connect worker supports waiting on three modes:
|
||||
|
|
@ -69,6 +70,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
|||
self.files = files
|
||||
self.startup_mode = self.STARTUP_MODE_LISTEN
|
||||
self.environment = {}
|
||||
self.external_config_template_func = None
|
||||
|
||||
def pids(self, node):
|
||||
"""Return process ids for Kafka Connect processes."""
|
||||
|
|
@ -87,6 +89,17 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
|||
self.config_template_func = config_template_func
|
||||
self.connector_config_templates = connector_config_templates
|
||||
|
||||
def set_external_configs(self, external_config_template_func):
|
||||
"""
|
||||
Set the properties that will be written in the external file properties
|
||||
as used by the org.apache.kafka.common.config.provider.FileConfigProvider.
|
||||
When this is used, the worker configuration must also enable the FileConfigProvider.
|
||||
This is not provided in the constructor in case the worker
|
||||
config generally needs access to ZK/Kafka services to
|
||||
create the configuration.
|
||||
"""
|
||||
self.external_config_template_func = external_config_template_func
|
||||
|
||||
def listening(self, node):
|
||||
try:
|
||||
cmd = "nc -z %s %s" % (node.account.hostname, self.CONNECT_REST_PORT)
|
||||
|
|
@ -145,7 +158,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
|||
def clean_node(self, node):
|
||||
node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
|
||||
self.security_config.clean_node(node)
|
||||
all_files = " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files)
|
||||
all_files = " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE, self.EXTERNAL_CONFIGS_FILE] + self.config_filenames() + self.files)
|
||||
node.account.ssh("rm -rf " + all_files, allow_fail=False)
|
||||
|
||||
def config_filenames(self):
|
||||
|
|
@ -263,6 +276,8 @@ class ConnectStandaloneService(ConnectServiceBase):
|
|||
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
|
||||
|
||||
self.security_config.setup_node(node)
|
||||
if self.external_config_template_func:
|
||||
node.account.create_file(self.EXTERNAL_CONFIGS_FILE, self.external_config_template_func(node))
|
||||
node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
|
||||
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
|
||||
remote_connector_configs = []
|
||||
|
|
@ -308,6 +323,8 @@ class ConnectDistributedService(ConnectServiceBase):
|
|||
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
|
||||
|
||||
self.security_config.setup_node(node)
|
||||
if self.external_config_template_func:
|
||||
node.account.create_file(self.EXTERNAL_CONFIGS_FILE, self.external_config_template_func(node))
|
||||
node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
|
||||
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
|
||||
if self.connector_config_templates:
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ class ConnectStandaloneFileTest(Test):
|
|||
|
||||
OFFSETS_FILE = "/mnt/connect.offsets"
|
||||
|
||||
TOPIC = "${file:/mnt/connect/connect-file-external.properties:topic.external}"
|
||||
TOPIC = "${file:" + EXTERNAL_CONFIGS_FILE + ":topic.external}"
|
||||
TOPIC_TEST = "test"
|
||||
|
||||
FIRST_INPUT_LIST = ["foo", "bar", "baz"]
|
||||
|
|
@ -100,14 +100,12 @@ class ConnectStandaloneFileTest(Test):
|
|||
self.zk.start()
|
||||
self.kafka.start()
|
||||
|
||||
source_external_props = os.path.join(self.source.PERSISTENT_ROOT, "connect-file-external.properties")
|
||||
self.source.node.account.create_file(source_external_props, self.render('connect-file-external.properties'))
|
||||
self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")])
|
||||
|
||||
sink_external_props = os.path.join(self.sink.PERSISTENT_ROOT, "connect-file-external.properties")
|
||||
self.sink.node.account.create_file(sink_external_props, self.render('connect-file-external.properties'))
|
||||
self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")])
|
||||
|
||||
self.source.set_external_configs(lambda node: self.render("connect-file-external.properties", node=node))
|
||||
self.sink.set_external_configs(lambda node: self.render("connect-file-external.properties", node=node))
|
||||
|
||||
self.source.start()
|
||||
self.sink.start()
|
||||
|
||||
|
|
@ -182,6 +180,9 @@ class ConnectStandaloneFileTest(Test):
|
|||
self.override_value_converter_schemas_enable = False
|
||||
self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")])
|
||||
|
||||
self.source.set_external_configs(lambda node: self.render("connect-file-external.properties", node=node))
|
||||
self.sink.set_external_configs(lambda node: self.render("connect-file-external.properties", node=node))
|
||||
|
||||
self.source.start()
|
||||
self.sink.start()
|
||||
|
||||
|
|
|
|||
|
|
@ -32,5 +32,8 @@ offset.storage.file.filename={{ OFFSETS_FILE }}
|
|||
# Reduce the admin client request timeouts so that we don't wait the default 120 sec before failing to connect the admin client
|
||||
request.timeout.ms=30000
|
||||
|
||||
# Allow connector configs to use externalized config values of the form:
|
||||
# ${file:/mnt/connect/connect-external-configs.properties:topic.external}
|
||||
#
|
||||
config.providers=file
|
||||
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
|
||||
|
|
|
|||
Loading…
Reference in New Issue