mirror of https://github.com/apache/kafka.git
KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (#11908)
With this change we stop including the non-production grade connectors that are meant to be used for demos and quick starts by default in the CLASSPATH and plugin.path of Connect deployments. The package of these connector will still be shipped with the Apache Kafka distribution and will be available for explicit inclusion. The changes have been tested through the system tests and the existing unit and integration tests. Reviewers: Mickael Maison <mickael.maison@gmail.com>, Randall Hauch <rhauch@gmail.com>
This commit is contained in:
parent
ecb0e8eece
commit
dd62ef2eda
|
@ -32,7 +32,7 @@ if [ -z "$INCLUDE_TEST_JARS" ]; then
|
|||
fi
|
||||
|
||||
# Exclude jars not necessary for running commands.
|
||||
regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
|
||||
regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc|connect-file.*\.jar)$"
|
||||
should_include_file() {
|
||||
if [ "$INCLUDE_TEST_JARS" = true ]; then
|
||||
return 0
|
||||
|
@ -171,7 +171,7 @@ do
|
|||
CLASSPATH="$CLASSPATH:$dir/*"
|
||||
done
|
||||
|
||||
for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
|
||||
for cc_pkg in "api" "transforms" "runtime" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
|
||||
do
|
||||
for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
|
||||
do
|
||||
|
|
|
@ -48,6 +48,7 @@
|
|||
<li><code>bootstrap.servers</code> - List of Kafka servers used to bootstrap connections to Kafka</li>
|
||||
<li><code>key.converter</code> - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.</li>
|
||||
<li><code>value.converter</code> - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.</li>
|
||||
<li><code>plugin.path</code> (default <code>empty</code>) - a list of paths that contain Connect plugins (connectors, converters, transformations). Before running quick starts, users must add the absolute path that contains the example FileStreamSourceConnector and FileStreamSinkConnector packaged in <code>connect-file-"version".jar</code>, because these connectors are not included by default to the <code>CLASSPATH</code> or the <code>plugin.path</code> of the Connect worker (see <a href="#connectconfigs_plugin.path">plugin.path</a> property for examples).</li>
|
||||
</ul>
|
||||
|
||||
<p>The important configuration options specific to standalone mode are:</p>
|
||||
|
|
|
@ -32,8 +32,8 @@
|
|||
the latest Kafka release and extract it:
|
||||
</p>
|
||||
|
||||
<pre class="line-numbers"><code class="language-bash">$ tar -xzf kafka_2.13-3.1.0.tgz
|
||||
$ cd kafka_2.13-3.1.0</code></pre>
|
||||
<pre class="line-numbers"><code class="language-bash">$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
|
||||
$ cd kafka_{{scalaVersion}}-{{fullDotVersion}}</code></pre>
|
||||
</div>
|
||||
|
||||
<div class="quickstart-step">
|
||||
|
@ -173,7 +173,20 @@ This is my second event</code></pre>
|
|||
</p>
|
||||
|
||||
<p>
|
||||
First, we'll start by creating some seed data to test with:
|
||||
First, make sure to add <code class="language-bash">connect-file-{{fullDotVersion}}.jar</code> to the <code>plugin.path</code> property in the Connect worker's configuration.
|
||||
For the purpose of this quickstart we'll use a relative path and consider the connectors' package as an uber jar, which works when the quickstart commands are run from the installation directory.
|
||||
However, it's worth noting that for production deployments using absolute paths is always preferable. See <a href="#connectconfigs_plugin.path">plugin.path</a> for a detailed description of how to set this config.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Edit the <code class="language-bash">config/connect-standalone.properties</code> file, add or change the <code>plugin.path</code> configuration property match the following, and save the file:
|
||||
</p>
|
||||
|
||||
<pre class="brush: bash;">
|
||||
> echo "plugin.path=lib/connect-file-{{fullDotVersion}}.jar"</pre>
|
||||
|
||||
<p>
|
||||
Then, start by creating some seed data to test with:
|
||||
</p>
|
||||
|
||||
<pre class="brush: bash;">
|
||||
|
|
|
@ -69,7 +69,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
|||
"collect_default": True}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
|
||||
def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec=60,
|
||||
include_filestream_connectors=False):
|
||||
super(ConnectServiceBase, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.security_config = kafka.security_config.client_config()
|
||||
|
@ -78,6 +79,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
|||
self.startup_timeout_sec = startup_timeout_sec
|
||||
self.environment = {}
|
||||
self.external_config_template_func = None
|
||||
self.include_filestream_connectors = include_filestream_connectors
|
||||
self.logger.debug("include_filestream_connectors % s", include_filestream_connectors)
|
||||
|
||||
def pids(self, node):
|
||||
"""Return process ids for Kafka Connect processes."""
|
||||
|
@ -279,12 +282,34 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
|||
env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
|
||||
self.environment[envvar] = env_opts
|
||||
|
||||
def append_filestream_connectors_to_classpath(self):
|
||||
if self.include_filestream_connectors:
|
||||
cwd = os.getcwd()
|
||||
self.logger.info("Including filestream connectors when starting Connect. "
|
||||
"Looking for jar locally in: %s" % cwd)
|
||||
relative_path = "/connect/file/build/libs/"
|
||||
local_dir = cwd + relative_path
|
||||
lib_dir = self.path.home() + relative_path
|
||||
for pwd, dirs, files in os.walk(local_dir):
|
||||
for file in files:
|
||||
if file.startswith("connect-file") and file.endswith(".jar"):
|
||||
# Use the expected directory on the node instead of the path in the driver node
|
||||
file_path = lib_dir + file
|
||||
self.logger.debug("Appending %s to Connect worker's CLASSPATH" % file_path)
|
||||
return "export CLASSPATH=${CLASSPATH}:%s; " % file_path
|
||||
self.logger.info("Jar with filestream connectors was not found under %s" % lib_dir)
|
||||
else:
|
||||
self.logger.info("Starting Connect without filestream connectors in the CLASSPATH")
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class ConnectStandaloneService(ConnectServiceBase):
|
||||
"""Runs Kafka Connect in standalone mode."""
|
||||
|
||||
def __init__(self, context, kafka, files, startup_timeout_sec = 60):
|
||||
super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec)
|
||||
def __init__(self, context, kafka, files, startup_timeout_sec=60, include_filestream_connectors=False):
|
||||
super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec,
|
||||
include_filestream_connectors)
|
||||
|
||||
# For convenience since this service only makes sense with a single node
|
||||
@property
|
||||
|
@ -299,6 +324,9 @@ class ConnectStandaloneService(ConnectServiceBase):
|
|||
|
||||
cmd += fix_opts_for_new_jvm(node)
|
||||
cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
|
||||
classpath = self.append_filestream_connectors_to_classpath()
|
||||
cmd += classpath if classpath else ""
|
||||
|
||||
for envvar in self.environment:
|
||||
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
|
||||
cmd += "%s %s " % (self.path.script("connect-standalone.sh", node), self.CONFIG_FILE)
|
||||
|
@ -339,8 +367,9 @@ class ConnectDistributedService(ConnectServiceBase):
|
|||
"""Runs Kafka Connect in distributed mode."""
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
|
||||
configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
|
||||
super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
|
||||
configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec=60,
|
||||
include_filestream_connectors=False):
|
||||
super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec, include_filestream_connectors)
|
||||
self.startup_mode = self.STARTUP_MODE_JOIN
|
||||
self.offsets_topic = offsets_topic
|
||||
self.configs_topic = configs_topic
|
||||
|
@ -355,6 +384,9 @@ class ConnectDistributedService(ConnectServiceBase):
|
|||
cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
|
||||
for envvar in self.environment:
|
||||
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
|
||||
|
||||
classpath = self.append_filestream_connectors_to_classpath()
|
||||
cmd += classpath if classpath else ""
|
||||
cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), self.CONFIG_FILE)
|
||||
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
|
||||
return cmd
|
||||
|
|
|
@ -80,7 +80,7 @@ class ConnectDistributedTest(Test):
|
|||
self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
|
||||
self.schemas = True
|
||||
|
||||
def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False):
|
||||
def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, include_filestream_connectors=False):
|
||||
self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
|
||||
security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
|
||||
topics=self.topics, version=broker_version,
|
||||
|
@ -89,7 +89,8 @@ class ConnectDistributedTest(Test):
|
|||
for node in self.kafka.nodes:
|
||||
node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
|
||||
|
||||
self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
|
||||
self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE],
|
||||
include_filestream_connectors=include_filestream_connectors)
|
||||
self.cc.log_level = "DEBUG"
|
||||
|
||||
self.zk.start()
|
||||
|
@ -370,7 +371,7 @@ class ConnectDistributedTest(Test):
|
|||
"""
|
||||
|
||||
self.CONNECT_PROTOCOL = connect_protocol
|
||||
self.setup_services(security_protocol=security_protocol)
|
||||
self.setup_services(security_protocol=security_protocol, include_filestream_connectors=True)
|
||||
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
|
||||
|
||||
self.cc.start()
|
||||
|
@ -522,7 +523,7 @@ class ConnectDistributedTest(Test):
|
|||
@matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_transformations(self, connect_protocol):
|
||||
self.CONNECT_PROTOCOL = connect_protocol
|
||||
self.setup_services(timestamp_type='CreateTime')
|
||||
self.setup_services(timestamp_type='CreateTime', include_filestream_connectors=True)
|
||||
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
|
||||
self.cc.start()
|
||||
|
||||
|
@ -610,7 +611,8 @@ class ConnectDistributedTest(Test):
|
|||
or relies upon the broker to auto-create the topics (v0.10.0.x and before).
|
||||
"""
|
||||
self.CONNECT_PROTOCOL = connect_protocol
|
||||
self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics, security_protocol=security_protocol)
|
||||
self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics,
|
||||
security_protocol=security_protocol, include_filestream_connectors=True)
|
||||
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
|
||||
|
||||
self.cc.start()
|
||||
|
|
|
@ -73,7 +73,8 @@ class ConnectRestApiTest(KafkaTest):
|
|||
'test': {'partitions': 1, 'replication-factor': 1}
|
||||
})
|
||||
|
||||
self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
|
||||
self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE],
|
||||
include_filestream_connectors=True)
|
||||
|
||||
@cluster(num_nodes=4)
|
||||
@matrix(connect_protocol=['compatible', 'eager'])
|
||||
|
|
|
@ -91,8 +91,10 @@ class ConnectStandaloneFileTest(Test):
|
|||
security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
|
||||
topics=self.topics, controller_num_nodes_override=self.num_zk)
|
||||
|
||||
self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
|
||||
self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
|
||||
self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE],
|
||||
include_filestream_connectors=True)
|
||||
self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE],
|
||||
include_filestream_connectors=True)
|
||||
self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC_TEST,
|
||||
consumer_timeout_ms=10000)
|
||||
|
||||
|
@ -164,8 +166,10 @@ class ConnectStandaloneFileTest(Test):
|
|||
else:
|
||||
faulty_records = faulty_records[0]
|
||||
|
||||
self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
|
||||
self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
|
||||
self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE],
|
||||
include_filestream_connectors=True)
|
||||
self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE],
|
||||
include_filestream_connectors=True)
|
||||
|
||||
self.zk.start()
|
||||
self.kafka.start()
|
||||
|
|
Loading…
Reference in New Issue