diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 6167583780b..490f930b8cc 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -32,7 +32,7 @@ if [ -z "$INCLUDE_TEST_JARS" ]; then fi # Exclude jars not necessary for running commands. -regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$" +regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc|connect-file.*\.jar)$" should_include_file() { if [ "$INCLUDE_TEST_JARS" = true ]; then return 0 @@ -171,7 +171,7 @@ do CLASSPATH="$CLASSPATH:$dir/*" done -for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension" +for cc_pkg in "api" "transforms" "runtime" "mirror" "mirror-client" "json" "tools" "basic-auth-extension" do for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar; do diff --git a/docs/connect.html b/docs/connect.html index 66d621248de..1251c3ce683 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -48,6 +48,7 @@
  • bootstrap.servers - List of Kafka servers used to bootstrap connections to Kafka
  • key.converter - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.
  • value.converter - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.
  • +
  • plugin.path (default empty) - a list of paths that contain Connect plugins (connectors, converters, transformations). Before running quick starts, users must add the absolute path that contains the example FileStreamSourceConnector and FileStreamSinkConnector packaged in connect-file-"version".jar, because these connectors are not included by default to the CLASSPATH or the plugin.path of the Connect worker (see plugin.path property for examples).
  • The important configuration options specific to standalone mode are:

    diff --git a/docs/quickstart.html b/docs/quickstart.html index 2ef56c85057..70b13146e65 100644 --- a/docs/quickstart.html +++ b/docs/quickstart.html @@ -32,8 +32,8 @@ the latest Kafka release and extract it:

    -
    $ tar -xzf kafka_2.13-3.1.0.tgz
    -$ cd kafka_2.13-3.1.0
    +
    $ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
    +$ cd kafka_{{scalaVersion}}-{{fullDotVersion}}
    @@ -173,7 +173,20 @@ This is my second event

    - First, we'll start by creating some seed data to test with: + First, make sure to add connect-file-{{fullDotVersion}}.jar to the plugin.path property in the Connect worker's configuration. + For the purpose of this quickstart we'll use a relative path and consider the connectors' package as an uber jar, which works when the quickstart commands are run from the installation directory. + However, it's worth noting that for production deployments using absolute paths is always preferable. See plugin.path for a detailed description of how to set this config. +

    + +

    + Edit the config/connect-standalone.properties file, add or change the plugin.path configuration property match the following, and save the file: +

    + +
    +> echo "plugin.path=lib/connect-file-{{fullDotVersion}}.jar"
    + +

    + Then, start by creating some seed data to test with:

    diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
    index 26c0d927dcc..41c33ccb9e1 100644
    --- a/tests/kafkatest/services/connect.py
    +++ b/tests/kafkatest/services/connect.py
    @@ -69,7 +69,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
                 "collect_default": True}
         }
     
    -    def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
    +    def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec=60,
    +                 include_filestream_connectors=False):
             super(ConnectServiceBase, self).__init__(context, num_nodes)
             self.kafka = kafka
             self.security_config = kafka.security_config.client_config()
    @@ -78,6 +79,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             self.startup_timeout_sec = startup_timeout_sec
             self.environment = {}
             self.external_config_template_func = None
    +        self.include_filestream_connectors = include_filestream_connectors
    +        self.logger.debug("include_filestream_connectors % s", include_filestream_connectors)
     
         def pids(self, node):
             """Return process ids for Kafka Connect processes."""
    @@ -279,12 +282,34 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
                 env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
             self.environment[envvar] = env_opts
     
    +    def append_filestream_connectors_to_classpath(self):
    +        if self.include_filestream_connectors:
    +            cwd = os.getcwd()
    +            self.logger.info("Including filestream connectors when starting Connect. "
    +                             "Looking for jar locally in: %s" % cwd)
    +            relative_path = "/connect/file/build/libs/"
    +            local_dir = cwd + relative_path
    +            lib_dir = self.path.home() + relative_path
    +            for pwd, dirs, files in os.walk(local_dir):
    +                for file in files:
    +                    if file.startswith("connect-file") and file.endswith(".jar"):
    +                        # Use the expected directory on the node instead of the path in the driver node
    +                        file_path = lib_dir + file
    +                        self.logger.debug("Appending %s to Connect worker's CLASSPATH" % file_path)
    +                        return "export CLASSPATH=${CLASSPATH}:%s; " % file_path
    +            self.logger.info("Jar with filestream connectors was not found under %s" % lib_dir)
    +        else:
    +            self.logger.info("Starting Connect without filestream connectors in the CLASSPATH")
    +
    +        return None
    +
     
     class ConnectStandaloneService(ConnectServiceBase):
         """Runs Kafka Connect in standalone mode."""
     
    -    def __init__(self, context, kafka, files, startup_timeout_sec = 60):
    -        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec)
    +    def __init__(self, context, kafka, files, startup_timeout_sec=60, include_filestream_connectors=False):
    +        super(ConnectStandaloneService, self).__init__(context, 1, kafka, files, startup_timeout_sec,
    +                                                       include_filestream_connectors)
     
         # For convenience since this service only makes sense with a single node
         @property
    @@ -299,6 +324,9 @@ class ConnectStandaloneService(ConnectServiceBase):
     
             cmd += fix_opts_for_new_jvm(node)
             cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
    +        classpath = self.append_filestream_connectors_to_classpath()
    +        cmd += classpath if classpath else ""
    +
             for envvar in self.environment:
                 cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
             cmd += "%s %s " % (self.path.script("connect-standalone.sh", node), self.CONFIG_FILE)
    @@ -339,8 +367,9 @@ class ConnectDistributedService(ConnectServiceBase):
         """Runs Kafka Connect in distributed mode."""
     
         def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
    -                 configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec = 60):
    -        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec)
    +                 configs_topic="connect-configs", status_topic="connect-status", startup_timeout_sec=60,
    +                 include_filestream_connectors=False):
    +        super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files, startup_timeout_sec, include_filestream_connectors)
             self.startup_mode = self.STARTUP_MODE_JOIN
             self.offsets_topic = offsets_topic
             self.configs_topic = configs_topic
    @@ -355,6 +384,9 @@ class ConnectDistributedService(ConnectServiceBase):
             cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
             for envvar in self.environment:
                 cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
    +
    +        classpath = self.append_filestream_connectors_to_classpath()
    +        cmd += classpath if classpath else ""
             cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), self.CONFIG_FILE)
             cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
             return cmd
    diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
    index 6bc52b0d35f..970779f723f 100644
    --- a/tests/kafkatest/tests/connect/connect_distributed_test.py
    +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
    @@ -80,7 +80,7 @@ class ConnectDistributedTest(Test):
             self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
             self.schemas = True
     
    -    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False):
    +    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, include_filestream_connectors=False):
             self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
                                       security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
                                       topics=self.topics, version=broker_version,
    @@ -89,7 +89,8 @@ class ConnectDistributedTest(Test):
                 for node in self.kafka.nodes:
                     node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
     
    -        self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
    +        self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE],
    +                                            include_filestream_connectors=include_filestream_connectors)
             self.cc.log_level = "DEBUG"
     
             self.zk.start()
    @@ -370,7 +371,7 @@ class ConnectDistributedTest(Test):
             """
     
             self.CONNECT_PROTOCOL = connect_protocol
    -        self.setup_services(security_protocol=security_protocol)
    +        self.setup_services(security_protocol=security_protocol, include_filestream_connectors=True)
             self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
     
             self.cc.start()
    @@ -522,7 +523,7 @@ class ConnectDistributedTest(Test):
         @matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
         def test_transformations(self, connect_protocol):
             self.CONNECT_PROTOCOL = connect_protocol
    -        self.setup_services(timestamp_type='CreateTime')
    +        self.setup_services(timestamp_type='CreateTime', include_filestream_connectors=True)
             self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
             self.cc.start()
     
    @@ -610,7 +611,8 @@ class ConnectDistributedTest(Test):
             or relies upon the broker to auto-create the topics (v0.10.0.x and before).
             """
             self.CONNECT_PROTOCOL = connect_protocol
    -        self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics, security_protocol=security_protocol)
    +        self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics,
    +                            security_protocol=security_protocol, include_filestream_connectors=True)
             self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
     
             self.cc.start()
    diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
    index 4d978a232d2..ff44d9412f1 100644
    --- a/tests/kafkatest/tests/connect/connect_rest_test.py
    +++ b/tests/kafkatest/tests/connect/connect_rest_test.py
    @@ -73,7 +73,8 @@ class ConnectRestApiTest(KafkaTest):
                 'test': {'partitions': 1, 'replication-factor': 1}
             })
     
    -        self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
    +        self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE],
    +                                            include_filestream_connectors=True)
     
         @cluster(num_nodes=4)
         @matrix(connect_protocol=['compatible', 'eager'])
    diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
    index 1a7f6abfeb8..4c2a91a6036 100644
    --- a/tests/kafkatest/tests/connect/connect_test.py
    +++ b/tests/kafkatest/tests/connect/connect_test.py
    @@ -91,8 +91,10 @@ class ConnectStandaloneFileTest(Test):
                                       security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
                                       topics=self.topics, controller_num_nodes_override=self.num_zk)
     
    -        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
    -        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
    +        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE],
    +                                               include_filestream_connectors=True)
    +        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE],
    +                                             include_filestream_connectors=True)
             self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC_TEST,
                                                       consumer_timeout_ms=10000)
     
    @@ -164,8 +166,10 @@ class ConnectStandaloneFileTest(Test):
             else:
                 faulty_records = faulty_records[0]
     
    -        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
    -        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
    +        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE],
    +                                               include_filestream_connectors=True)
    +        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE],
    +                                             include_filestream_connectors=True)
     
             self.zk.start()
             self.kafka.start()