2011-08-02 07:41:24 +08:00
#!/bin/bash
2011-10-08 03:51:28 +08:00
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
2014-06-05 13:20:50 +08:00
#
2011-10-08 03:51:28 +08:00
# http://www.apache.org/licenses/LICENSE-2.0
2014-06-05 13:20:50 +08:00
#
2011-10-08 03:51:28 +08:00
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2011-08-02 07:41:24 +08:00
if [ $# -lt 1 ] ;
then
2013-11-19 13:02:00 +08:00
echo " USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts] "
2011-08-02 07:41:24 +08:00
exit 1
fi
2023-10-26 17:29:39 +08:00
# WINDOWS_OS_FORMAT == 1 if Cygwin or MinGW is detected, else 0.
if [ [ $( uname -a) = ~ "CYGWIN" || $( uname -a) = ~ "MINGW" || $( uname -a) = ~ "MSYS" ] ] ; then
WINDOWS_OS_FORMAT = 1
2025-01-01 21:06:01 +08:00
export MSYS2_ARG_CONV_EXCL = " -Xlog:gc*:file=;-Dlog4j2.configurationFile=; $MSYS2_ARG_CONV_EXCL "
2016-08-09 23:33:53 +08:00
else
2023-10-26 17:29:39 +08:00
WINDOWS_OS_FORMAT = 0
2016-08-09 23:33:53 +08:00
fi
2016-04-30 01:28:33 +08:00
if [ -z " $INCLUDE_TEST_JARS " ] ; then
INCLUDE_TEST_JARS = false
fi
# Exclude jars not necessary for running commands.
2022-03-31 04:15:42 +08:00
regex = " (-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc|connect-file.*\.jar) $"
2016-04-30 01:28:33 +08:00
should_include_file( ) {
if [ " $INCLUDE_TEST_JARS " = true ] ; then
return 0
fi
file = $1
2022-09-16 14:32:53 +08:00
if [ -z " $( echo " $file " | grep -E " $regex " ) " ] ; then
2016-04-30 01:28:33 +08:00
return 0
else
return 1
fi
}
2011-08-02 07:41:24 +08:00
base_dir = $( dirname $0 ) /..
2013-07-11 23:06:43 +08:00
if [ -z " $SCALA_VERSION " ] ; then
2024-09-30 02:08:51 +08:00
SCALA_VERSION = 2.13.15
2020-02-23 00:44:51 +08:00
if [ [ -f " $base_dir /gradle.properties " ] ] ; then
SCALA_VERSION = ` grep "^scalaVersion=" " $base_dir /gradle.properties " | cut -d= -f 2`
fi
2014-09-15 01:45:37 +08:00
fi
if [ -z " $SCALA_BINARY_VERSION " ] ; then
2016-11-17 10:15:14 +08:00
SCALA_BINARY_VERSION = $( echo $SCALA_VERSION | cut -f 1-2 -d '.' )
2013-07-11 23:06:43 +08:00
fi
2013-01-24 14:23:44 +08:00
2014-02-08 05:48:04 +08:00
# run ./gradlew copyDependantLibs to get all dependant jars in a local dir
2015-11-04 00:10:37 +08:00
shopt -s nullglob
2019-07-29 09:18:59 +08:00
if [ -z " $UPGRADE_KAFKA_STREAMS_TEST_VERSION " ] ; then
for dir in " $base_dir " /core/build/dependant-libs-${ SCALA_VERSION } *;
do
CLASSPATH = " $CLASSPATH : $dir /* "
done
fi
2014-02-08 05:48:04 +08:00
2016-05-13 19:10:09 +08:00
for file in " $base_dir " /examples/build/libs/kafka-examples*.jar;
2014-02-08 05:48:04 +08:00
do
2016-04-30 01:28:33 +08:00
if should_include_file " $file " ; then
2016-05-13 19:10:09 +08:00
CLASSPATH = " $CLASSPATH " :" $file "
2016-04-30 01:28:33 +08:00
fi
2014-02-08 05:48:04 +08:00
done
2018-04-07 08:00:52 +08:00
if [ -z " $UPGRADE_KAFKA_STREAMS_TEST_VERSION " ] ; then
clients_lib_dir = $( dirname $0 ) /../clients/build/libs
streams_lib_dir = $( dirname $0 ) /../streams/build/libs
2019-12-12 01:48:23 +08:00
streams_dependant_clients_lib_dir = $( dirname $0 ) /../streams/build/dependant-libs-${ SCALA_VERSION }
2018-04-07 08:00:52 +08:00
else
clients_lib_dir = /opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION /libs
streams_lib_dir = $clients_lib_dir
2019-12-12 01:48:23 +08:00
streams_dependant_clients_lib_dir = $streams_lib_dir
2018-04-07 08:00:52 +08:00
fi
2012-01-06 08:43:46 +08:00
2018-04-07 08:00:52 +08:00
for file in " $clients_lib_dir " /kafka-clients*.jar;
2015-09-26 08:27:58 +08:00
do
2016-04-30 01:28:33 +08:00
if should_include_file " $file " ; then
2016-05-13 19:10:09 +08:00
CLASSPATH = " $CLASSPATH " :" $file "
2016-04-30 01:28:33 +08:00
fi
2016-02-24 04:14:26 +08:00
done
2018-04-07 08:00:52 +08:00
for file in " $streams_lib_dir " /kafka-streams*.jar;
2016-03-02 10:53:58 +08:00
do
2016-04-30 01:28:33 +08:00
if should_include_file " $file " ; then
2016-05-13 19:10:09 +08:00
CLASSPATH = " $CLASSPATH " :" $file "
2016-04-30 01:28:33 +08:00
fi
2016-03-02 10:53:58 +08:00
done
2018-04-07 08:00:52 +08:00
if [ -z " $UPGRADE_KAFKA_STREAMS_TEST_VERSION " ] ; then
for file in " $base_dir " /streams/examples/build/libs/kafka-streams-examples*.jar;
do
if should_include_file " $file " ; then
CLASSPATH = " $CLASSPATH " :" $file "
fi
done
else
VERSION_NO_DOTS = ` echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g' `
SHORT_VERSION_NO_DOTS = ${ VERSION_NO_DOTS : 0 : (( ${# VERSION_NO_DOTS } - 1)) } # remove last char, ie, bug-fix number
for file in " $base_dir " /streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS /build/libs/kafka-streams-upgrade-system-tests*.jar;
do
if should_include_file " $file " ; then
MINOR: Enable ignored upgrade system tests - trunk (#5605)
Removed ignore annotations from the upgrade tests. This PR includes the following changes for updating the upgrade tests:
* Uploaded new versions 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, and 2.0.0 (in the associated scala versions) to kafka-packages
* Update versions in version.py, Dockerfile, base.sh
* Added new versions to StreamsUpgradeTest.test_upgrade_downgrade_brokers including version 2.0.0
* Added new versions StreamsUpgradeTest.test_simple_upgrade_downgrade test excluding version 2.0.0
* Version 2.0.0 is excluded from the streams upgrade/downgrade test as StreamsConfig needs an update for the new version, requiring a KIP. Once the community votes the KIP in, a minor follow-up PR can be pushed to add the 2.0.0 version to the upgrade test.
* Fixed minor bug in kafka-run-class.sh for classpath in upgrade/downgrade tests across versions.
* Follow on PRs for 0.10.2x, 0.11.0x, 1.0.x, 1.1.x, and 2.0.x will be pushed soon with the same updates required for the specific version.
Reviewers: Eno Thereska <eno.thereska@gmail.com>, John Roesler <vvcephei@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
2018-09-14 04:46:47 +08:00
CLASSPATH = " $file " :" $CLASSPATH "
2018-04-07 08:00:52 +08:00
fi
done
fi
2019-12-12 01:48:23 +08:00
for file in " $streams_dependant_clients_lib_dir " /rocksdb*.jar;
do
CLASSPATH = " $CLASSPATH " :" $file "
done
for file in " $streams_dependant_clients_lib_dir " /*hamcrest*.jar;
2016-02-24 04:14:26 +08:00
do
2016-05-13 19:10:09 +08:00
CLASSPATH = " $CLASSPATH " :" $file "
2015-09-26 08:27:58 +08:00
done
2021-02-27 09:13:20 +08:00
for file in " $base_dir " /shell/build/libs/kafka-shell*.jar;
do
if should_include_file " $file " ; then
CLASSPATH = " $CLASSPATH " :" $file "
fi
done
for dir in " $base_dir " /shell/build/dependant-libs-${ SCALA_VERSION } *;
do
CLASSPATH = " $CLASSPATH : $dir /* "
done
2016-05-13 19:10:09 +08:00
for file in " $base_dir " /tools/build/libs/kafka-tools*.jar;
2015-07-29 08:22:14 +08:00
do
2016-04-30 01:28:33 +08:00
if should_include_file " $file " ; then
2016-05-13 19:10:09 +08:00
CLASSPATH = " $CLASSPATH " :" $file "
2016-04-30 01:28:33 +08:00
fi
2015-07-29 08:22:14 +08:00
done
2016-05-13 19:10:09 +08:00
for dir in " $base_dir " /tools/build/dependant-libs-${ SCALA_VERSION } *;
2015-07-29 08:22:14 +08:00
do
2016-05-13 19:10:09 +08:00
CLASSPATH = " $CLASSPATH : $dir /* "
2015-07-29 08:22:14 +08:00
done
2021-04-16 02:37:15 +08:00
for file in " $base_dir " /trogdor/build/libs/trogdor-*.jar;
do
if should_include_file " $file " ; then
CLASSPATH = " $CLASSPATH " :" $file "
fi
done
for dir in " $base_dir " /trogdor/build/dependant-libs-${ SCALA_VERSION } *;
do
CLASSPATH = " $CLASSPATH : $dir /* "
done
2022-03-31 04:15:42 +08:00
for cc_pkg in "api" "transforms" "runtime" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
KAFKA-2366; Initial patch for Copycat
This is an initial patch implementing the basics of Copycat for KIP-26.
The intent here is to start a review of the key pieces of the core API and get a reasonably functional, baseline, non-distributed implementation of Copycat in place to get things rolling. The current patch has a number of known issues that need to be addressed before a final version:
* Some build-related issues. Specifically, requires some locally-installed dependencies (see below), ignores checkstyle for the runtime data library because it's lifted from Avro currently and likely won't last in its current form, and some Gradle task dependencies aren't quite right because I haven't gotten rid of the dependency on `core` (which should now be an easy patch since new consumer groups are in a much better state).
* This patch currently depends on some Confluent trunk code because I prototyped with our Avro serializers w/ schema-registry support. We need to figure out what we want to provide as an example built-in set of serializers. Unlike core Kafka where we could ignore the issue, providing only ByteArray or String serializers, this is pretty central to how Copycat works.
* This patch uses a hacked up version of Avro as its runtime data format. Not sure if we want to go through the entire API discussion just to get some basic code committed, so I filed KAFKA-2367 to handle that separately. The core connector APIs and the runtime data APIs are entirely orthogonal.
* This patch needs some updates to get aligned with recent new consumer changes (specifically, I'm aware of the ConcurrentModificationException issue on exit). More generally, the new consumer is in flux but Copycat depends on it, so there are likely to be some negative interactions.
* The layout feels a bit awkward to me right now because I ported it from a Maven layout. We don't have nearly the same level of granularity in Kafka currently (core and clients, plus the mostly ignored examples, log4j-appender, and a couple of contribs). We might want to reorganize, although keeping data+api separate from runtime and connector plugins is useful for minimizing dependencies.
* There are a variety of other things (e.g., I'm not happy with the exception hierarchy/how they are currently handled, TopicPartition doesn't really need to be duplicated unless we want Copycat entirely isolated from the Kafka APIs, etc), but I expect those we'll cover in the review.
Before commenting on the patch, it's probably worth reviewing https://issues.apache.org/jira/browse/KAFKA-2365 and https://issues.apache.org/jira/browse/KAFKA-2366 to get an idea of what I had in mind for a) what we ultimately want with all the Copycat patches and b) what we aim to cover in this initial patch. My hope is that we can use a WIP patch (after the current obvious deficiencies are addressed) while recognizing that we want to make iterative progress with a bunch of subsequent PRs.
Author: Ewen Cheslack-Postava <me@ewencp.org>
Reviewers: Ismael Juma, Gwen Shapira
Closes #99 from ewencp/copycat and squashes the following commits:
a3a47a6 [Ewen Cheslack-Postava] Simplify Copycat exceptions, make them a subclass of KafkaException.
8c108b0 [Ewen Cheslack-Postava] Rename Coordinator to Herder to avoid confusion with the consumer coordinator.
7bf8075 [Ewen Cheslack-Postava] Make Copycat CLI speific to standalone mode, clean up some config and get rid of config storage in standalone mode.
656a003 [Ewen Cheslack-Postava] Clarify and expand the explanation of the Copycat Coordinator interface.
c0e5fdc [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
0fa7a36 [Ewen Cheslack-Postava] Mark Copycat classes as unstable and reduce visibility of some classes where possible.
d55d31e [Ewen Cheslack-Postava] Reorganize Copycat code to put it all under one top-level directory.
b29cb2c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
d713a21 [Ewen Cheslack-Postava] Address Gwen's review comments.
6787a85 [Ewen Cheslack-Postava] Make Converter generic to match serializers since some serialization formats do not require a base class of Object; update many other classes to have generic key and value class type parameters to match this change.
b194c73 [Ewen Cheslack-Postava] Split Copycat converter option into two options for key and value.
0b5a1a0 [Ewen Cheslack-Postava] Normalize naming to use partition for both source and Kafka, adjusting naming in CopycatRecord classes to clearly differentiate.
e345142 [Ewen Cheslack-Postava] Remove Copycat reflection utils, use existing Utils and ConfigDef functionality from clients package.
be5c387 [Ewen Cheslack-Postava] Minor cleanup
122423e [Ewen Cheslack-Postava] Style cleanup
6ba87de [Ewen Cheslack-Postava] Remove most of the Avro-based mock runtime data API, only preserving enough schema functionality to support basic primitive types for an initial patch.
4674d13 [Ewen Cheslack-Postava] Address review comments, clean up some code styling.
25b5739 [Ewen Cheslack-Postava] Fix sink task offset commit concurrency issue by moving it to the worker thread and waking up the consumer to ensure it exits promptly.
0aefe21 [Ewen Cheslack-Postava] Add log4j settings for Copycat.
220e42d [Ewen Cheslack-Postava] Replace Avro serializer with JSON serializer.
1243a7c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
5a618c6 [Ewen Cheslack-Postava] Remove offset serializers, instead reusing the existing serializers and removing schema projection support.
e849e10 [Ewen Cheslack-Postava] Remove duplicated TopicPartition implementation.
dec1379 [Ewen Cheslack-Postava] Switch to using new consumer coordinator instead of manually assigning partitions. Remove dependency of copycat-runtime on core.
4a9b4f3 [Ewen Cheslack-Postava] Add some helpful Copycat-specific build and test targets that cover all Copycat packages.
31cd1ca [Ewen Cheslack-Postava] Add CLI tools for Copycat.
e14942c [Ewen Cheslack-Postava] Add Copycat file connector.
0233456 [Ewen Cheslack-Postava] Add copycat-avro and copycat-runtime
11981d2 [Ewen Cheslack-Postava] Add copycat-data and copycat-api
2015-08-15 07:00:51 +08:00
do
2016-05-13 19:10:09 +08:00
for file in " $base_dir " /connect/${ cc_pkg } /build/libs/connect-${ cc_pkg } *.jar;
KAFKA-2366; Initial patch for Copycat
This is an initial patch implementing the basics of Copycat for KIP-26.
The intent here is to start a review of the key pieces of the core API and get a reasonably functional, baseline, non-distributed implementation of Copycat in place to get things rolling. The current patch has a number of known issues that need to be addressed before a final version:
* Some build-related issues. Specifically, requires some locally-installed dependencies (see below), ignores checkstyle for the runtime data library because it's lifted from Avro currently and likely won't last in its current form, and some Gradle task dependencies aren't quite right because I haven't gotten rid of the dependency on `core` (which should now be an easy patch since new consumer groups are in a much better state).
* This patch currently depends on some Confluent trunk code because I prototyped with our Avro serializers w/ schema-registry support. We need to figure out what we want to provide as an example built-in set of serializers. Unlike core Kafka where we could ignore the issue, providing only ByteArray or String serializers, this is pretty central to how Copycat works.
* This patch uses a hacked up version of Avro as its runtime data format. Not sure if we want to go through the entire API discussion just to get some basic code committed, so I filed KAFKA-2367 to handle that separately. The core connector APIs and the runtime data APIs are entirely orthogonal.
* This patch needs some updates to get aligned with recent new consumer changes (specifically, I'm aware of the ConcurrentModificationException issue on exit). More generally, the new consumer is in flux but Copycat depends on it, so there are likely to be some negative interactions.
* The layout feels a bit awkward to me right now because I ported it from a Maven layout. We don't have nearly the same level of granularity in Kafka currently (core and clients, plus the mostly ignored examples, log4j-appender, and a couple of contribs). We might want to reorganize, although keeping data+api separate from runtime and connector plugins is useful for minimizing dependencies.
* There are a variety of other things (e.g., I'm not happy with the exception hierarchy/how they are currently handled, TopicPartition doesn't really need to be duplicated unless we want Copycat entirely isolated from the Kafka APIs, etc), but I expect those we'll cover in the review.
Before commenting on the patch, it's probably worth reviewing https://issues.apache.org/jira/browse/KAFKA-2365 and https://issues.apache.org/jira/browse/KAFKA-2366 to get an idea of what I had in mind for a) what we ultimately want with all the Copycat patches and b) what we aim to cover in this initial patch. My hope is that we can use a WIP patch (after the current obvious deficiencies are addressed) while recognizing that we want to make iterative progress with a bunch of subsequent PRs.
Author: Ewen Cheslack-Postava <me@ewencp.org>
Reviewers: Ismael Juma, Gwen Shapira
Closes #99 from ewencp/copycat and squashes the following commits:
a3a47a6 [Ewen Cheslack-Postava] Simplify Copycat exceptions, make them a subclass of KafkaException.
8c108b0 [Ewen Cheslack-Postava] Rename Coordinator to Herder to avoid confusion with the consumer coordinator.
7bf8075 [Ewen Cheslack-Postava] Make Copycat CLI speific to standalone mode, clean up some config and get rid of config storage in standalone mode.
656a003 [Ewen Cheslack-Postava] Clarify and expand the explanation of the Copycat Coordinator interface.
c0e5fdc [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
0fa7a36 [Ewen Cheslack-Postava] Mark Copycat classes as unstable and reduce visibility of some classes where possible.
d55d31e [Ewen Cheslack-Postava] Reorganize Copycat code to put it all under one top-level directory.
b29cb2c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
d713a21 [Ewen Cheslack-Postava] Address Gwen's review comments.
6787a85 [Ewen Cheslack-Postava] Make Converter generic to match serializers since some serialization formats do not require a base class of Object; update many other classes to have generic key and value class type parameters to match this change.
b194c73 [Ewen Cheslack-Postava] Split Copycat converter option into two options for key and value.
0b5a1a0 [Ewen Cheslack-Postava] Normalize naming to use partition for both source and Kafka, adjusting naming in CopycatRecord classes to clearly differentiate.
e345142 [Ewen Cheslack-Postava] Remove Copycat reflection utils, use existing Utils and ConfigDef functionality from clients package.
be5c387 [Ewen Cheslack-Postava] Minor cleanup
122423e [Ewen Cheslack-Postava] Style cleanup
6ba87de [Ewen Cheslack-Postava] Remove most of the Avro-based mock runtime data API, only preserving enough schema functionality to support basic primitive types for an initial patch.
4674d13 [Ewen Cheslack-Postava] Address review comments, clean up some code styling.
25b5739 [Ewen Cheslack-Postava] Fix sink task offset commit concurrency issue by moving it to the worker thread and waking up the consumer to ensure it exits promptly.
0aefe21 [Ewen Cheslack-Postava] Add log4j settings for Copycat.
220e42d [Ewen Cheslack-Postava] Replace Avro serializer with JSON serializer.
1243a7c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
5a618c6 [Ewen Cheslack-Postava] Remove offset serializers, instead reusing the existing serializers and removing schema projection support.
e849e10 [Ewen Cheslack-Postava] Remove duplicated TopicPartition implementation.
dec1379 [Ewen Cheslack-Postava] Switch to using new consumer coordinator instead of manually assigning partitions. Remove dependency of copycat-runtime on core.
4a9b4f3 [Ewen Cheslack-Postava] Add some helpful Copycat-specific build and test targets that cover all Copycat packages.
31cd1ca [Ewen Cheslack-Postava] Add CLI tools for Copycat.
e14942c [Ewen Cheslack-Postava] Add Copycat file connector.
0233456 [Ewen Cheslack-Postava] Add copycat-avro and copycat-runtime
11981d2 [Ewen Cheslack-Postava] Add copycat-data and copycat-api
2015-08-15 07:00:51 +08:00
do
2016-04-30 01:28:33 +08:00
if should_include_file " $file " ; then
2016-05-13 19:10:09 +08:00
CLASSPATH = " $CLASSPATH " :" $file "
2016-04-30 01:28:33 +08:00
fi
KAFKA-2366; Initial patch for Copycat
This is an initial patch implementing the basics of Copycat for KIP-26.
The intent here is to start a review of the key pieces of the core API and get a reasonably functional, baseline, non-distributed implementation of Copycat in place to get things rolling. The current patch has a number of known issues that need to be addressed before a final version:
* Some build-related issues. Specifically, requires some locally-installed dependencies (see below), ignores checkstyle for the runtime data library because it's lifted from Avro currently and likely won't last in its current form, and some Gradle task dependencies aren't quite right because I haven't gotten rid of the dependency on `core` (which should now be an easy patch since new consumer groups are in a much better state).
* This patch currently depends on some Confluent trunk code because I prototyped with our Avro serializers w/ schema-registry support. We need to figure out what we want to provide as an example built-in set of serializers. Unlike core Kafka where we could ignore the issue, providing only ByteArray or String serializers, this is pretty central to how Copycat works.
* This patch uses a hacked up version of Avro as its runtime data format. Not sure if we want to go through the entire API discussion just to get some basic code committed, so I filed KAFKA-2367 to handle that separately. The core connector APIs and the runtime data APIs are entirely orthogonal.
* This patch needs some updates to get aligned with recent new consumer changes (specifically, I'm aware of the ConcurrentModificationException issue on exit). More generally, the new consumer is in flux but Copycat depends on it, so there are likely to be some negative interactions.
* The layout feels a bit awkward to me right now because I ported it from a Maven layout. We don't have nearly the same level of granularity in Kafka currently (core and clients, plus the mostly ignored examples, log4j-appender, and a couple of contribs). We might want to reorganize, although keeping data+api separate from runtime and connector plugins is useful for minimizing dependencies.
* There are a variety of other things (e.g., I'm not happy with the exception hierarchy/how they are currently handled, TopicPartition doesn't really need to be duplicated unless we want Copycat entirely isolated from the Kafka APIs, etc), but I expect those we'll cover in the review.
Before commenting on the patch, it's probably worth reviewing https://issues.apache.org/jira/browse/KAFKA-2365 and https://issues.apache.org/jira/browse/KAFKA-2366 to get an idea of what I had in mind for a) what we ultimately want with all the Copycat patches and b) what we aim to cover in this initial patch. My hope is that we can use a WIP patch (after the current obvious deficiencies are addressed) while recognizing that we want to make iterative progress with a bunch of subsequent PRs.
Author: Ewen Cheslack-Postava <me@ewencp.org>
Reviewers: Ismael Juma, Gwen Shapira
Closes #99 from ewencp/copycat and squashes the following commits:
a3a47a6 [Ewen Cheslack-Postava] Simplify Copycat exceptions, make them a subclass of KafkaException.
8c108b0 [Ewen Cheslack-Postava] Rename Coordinator to Herder to avoid confusion with the consumer coordinator.
7bf8075 [Ewen Cheslack-Postava] Make Copycat CLI speific to standalone mode, clean up some config and get rid of config storage in standalone mode.
656a003 [Ewen Cheslack-Postava] Clarify and expand the explanation of the Copycat Coordinator interface.
c0e5fdc [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
0fa7a36 [Ewen Cheslack-Postava] Mark Copycat classes as unstable and reduce visibility of some classes where possible.
d55d31e [Ewen Cheslack-Postava] Reorganize Copycat code to put it all under one top-level directory.
b29cb2c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
d713a21 [Ewen Cheslack-Postava] Address Gwen's review comments.
6787a85 [Ewen Cheslack-Postava] Make Converter generic to match serializers since some serialization formats do not require a base class of Object; update many other classes to have generic key and value class type parameters to match this change.
b194c73 [Ewen Cheslack-Postava] Split Copycat converter option into two options for key and value.
0b5a1a0 [Ewen Cheslack-Postava] Normalize naming to use partition for both source and Kafka, adjusting naming in CopycatRecord classes to clearly differentiate.
e345142 [Ewen Cheslack-Postava] Remove Copycat reflection utils, use existing Utils and ConfigDef functionality from clients package.
be5c387 [Ewen Cheslack-Postava] Minor cleanup
122423e [Ewen Cheslack-Postava] Style cleanup
6ba87de [Ewen Cheslack-Postava] Remove most of the Avro-based mock runtime data API, only preserving enough schema functionality to support basic primitive types for an initial patch.
4674d13 [Ewen Cheslack-Postava] Address review comments, clean up some code styling.
25b5739 [Ewen Cheslack-Postava] Fix sink task offset commit concurrency issue by moving it to the worker thread and waking up the consumer to ensure it exits promptly.
0aefe21 [Ewen Cheslack-Postava] Add log4j settings for Copycat.
220e42d [Ewen Cheslack-Postava] Replace Avro serializer with JSON serializer.
1243a7c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
5a618c6 [Ewen Cheslack-Postava] Remove offset serializers, instead reusing the existing serializers and removing schema projection support.
e849e10 [Ewen Cheslack-Postava] Remove duplicated TopicPartition implementation.
dec1379 [Ewen Cheslack-Postava] Switch to using new consumer coordinator instead of manually assigning partitions. Remove dependency of copycat-runtime on core.
4a9b4f3 [Ewen Cheslack-Postava] Add some helpful Copycat-specific build and test targets that cover all Copycat packages.
31cd1ca [Ewen Cheslack-Postava] Add CLI tools for Copycat.
e14942c [Ewen Cheslack-Postava] Add Copycat file connector.
0233456 [Ewen Cheslack-Postava] Add copycat-avro and copycat-runtime
11981d2 [Ewen Cheslack-Postava] Add copycat-data and copycat-api
2015-08-15 07:00:51 +08:00
done
2015-11-09 14:11:03 +08:00
if [ -d " $base_dir /connect/ ${ cc_pkg } /build/dependant-libs " ] ; then
2016-05-13 19:10:09 +08:00
CLASSPATH = " $CLASSPATH : $base_dir /connect/ ${ cc_pkg } /build/dependant-libs/* "
2015-11-04 00:10:37 +08:00
fi
KAFKA-2366; Initial patch for Copycat
This is an initial patch implementing the basics of Copycat for KIP-26.
The intent here is to start a review of the key pieces of the core API and get a reasonably functional, baseline, non-distributed implementation of Copycat in place to get things rolling. The current patch has a number of known issues that need to be addressed before a final version:
* Some build-related issues. Specifically, requires some locally-installed dependencies (see below), ignores checkstyle for the runtime data library because it's lifted from Avro currently and likely won't last in its current form, and some Gradle task dependencies aren't quite right because I haven't gotten rid of the dependency on `core` (which should now be an easy patch since new consumer groups are in a much better state).
* This patch currently depends on some Confluent trunk code because I prototyped with our Avro serializers w/ schema-registry support. We need to figure out what we want to provide as an example built-in set of serializers. Unlike core Kafka where we could ignore the issue, providing only ByteArray or String serializers, this is pretty central to how Copycat works.
* This patch uses a hacked up version of Avro as its runtime data format. Not sure if we want to go through the entire API discussion just to get some basic code committed, so I filed KAFKA-2367 to handle that separately. The core connector APIs and the runtime data APIs are entirely orthogonal.
* This patch needs some updates to get aligned with recent new consumer changes (specifically, I'm aware of the ConcurrentModificationException issue on exit). More generally, the new consumer is in flux but Copycat depends on it, so there are likely to be some negative interactions.
* The layout feels a bit awkward to me right now because I ported it from a Maven layout. We don't have nearly the same level of granularity in Kafka currently (core and clients, plus the mostly ignored examples, log4j-appender, and a couple of contribs). We might want to reorganize, although keeping data+api separate from runtime and connector plugins is useful for minimizing dependencies.
* There are a variety of other things (e.g., I'm not happy with the exception hierarchy/how they are currently handled, TopicPartition doesn't really need to be duplicated unless we want Copycat entirely isolated from the Kafka APIs, etc), but I expect those we'll cover in the review.
Before commenting on the patch, it's probably worth reviewing https://issues.apache.org/jira/browse/KAFKA-2365 and https://issues.apache.org/jira/browse/KAFKA-2366 to get an idea of what I had in mind for a) what we ultimately want with all the Copycat patches and b) what we aim to cover in this initial patch. My hope is that we can use a WIP patch (after the current obvious deficiencies are addressed) while recognizing that we want to make iterative progress with a bunch of subsequent PRs.
Author: Ewen Cheslack-Postava <me@ewencp.org>
Reviewers: Ismael Juma, Gwen Shapira
Closes #99 from ewencp/copycat and squashes the following commits:
a3a47a6 [Ewen Cheslack-Postava] Simplify Copycat exceptions, make them a subclass of KafkaException.
8c108b0 [Ewen Cheslack-Postava] Rename Coordinator to Herder to avoid confusion with the consumer coordinator.
7bf8075 [Ewen Cheslack-Postava] Make Copycat CLI speific to standalone mode, clean up some config and get rid of config storage in standalone mode.
656a003 [Ewen Cheslack-Postava] Clarify and expand the explanation of the Copycat Coordinator interface.
c0e5fdc [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
0fa7a36 [Ewen Cheslack-Postava] Mark Copycat classes as unstable and reduce visibility of some classes where possible.
d55d31e [Ewen Cheslack-Postava] Reorganize Copycat code to put it all under one top-level directory.
b29cb2c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
d713a21 [Ewen Cheslack-Postava] Address Gwen's review comments.
6787a85 [Ewen Cheslack-Postava] Make Converter generic to match serializers since some serialization formats do not require a base class of Object; update many other classes to have generic key and value class type parameters to match this change.
b194c73 [Ewen Cheslack-Postava] Split Copycat converter option into two options for key and value.
0b5a1a0 [Ewen Cheslack-Postava] Normalize naming to use partition for both source and Kafka, adjusting naming in CopycatRecord classes to clearly differentiate.
e345142 [Ewen Cheslack-Postava] Remove Copycat reflection utils, use existing Utils and ConfigDef functionality from clients package.
be5c387 [Ewen Cheslack-Postava] Minor cleanup
122423e [Ewen Cheslack-Postava] Style cleanup
6ba87de [Ewen Cheslack-Postava] Remove most of the Avro-based mock runtime data API, only preserving enough schema functionality to support basic primitive types for an initial patch.
4674d13 [Ewen Cheslack-Postava] Address review comments, clean up some code styling.
25b5739 [Ewen Cheslack-Postava] Fix sink task offset commit concurrency issue by moving it to the worker thread and waking up the consumer to ensure it exits promptly.
0aefe21 [Ewen Cheslack-Postava] Add log4j settings for Copycat.
220e42d [Ewen Cheslack-Postava] Replace Avro serializer with JSON serializer.
1243a7c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat
5a618c6 [Ewen Cheslack-Postava] Remove offset serializers, instead reusing the existing serializers and removing schema projection support.
e849e10 [Ewen Cheslack-Postava] Remove duplicated TopicPartition implementation.
dec1379 [Ewen Cheslack-Postava] Switch to using new consumer coordinator instead of manually assigning partitions. Remove dependency of copycat-runtime on core.
4a9b4f3 [Ewen Cheslack-Postava] Add some helpful Copycat-specific build and test targets that cover all Copycat packages.
31cd1ca [Ewen Cheslack-Postava] Add CLI tools for Copycat.
e14942c [Ewen Cheslack-Postava] Add Copycat file connector.
0233456 [Ewen Cheslack-Postava] Add copycat-avro and copycat-runtime
11981d2 [Ewen Cheslack-Postava] Add copycat-data and copycat-api
2015-08-15 07:00:51 +08:00
done
2013-04-12 02:01:11 +08:00
# classpath addition for release
2016-05-13 19:10:09 +08:00
for file in " $base_dir " /libs/*;
2016-04-30 01:28:33 +08:00
do
if should_include_file " $file " ; then
2016-05-13 19:10:09 +08:00
CLASSPATH = " $CLASSPATH " :" $file "
2016-04-30 01:28:33 +08:00
fi
done
2013-04-12 02:01:11 +08:00
2016-05-13 19:10:09 +08:00
for file in " $base_dir " /core/build/libs/kafka_${ SCALA_BINARY_VERSION } *.jar;
2013-04-12 02:01:11 +08:00
do
2016-04-30 01:28:33 +08:00
if should_include_file " $file " ; then
2016-05-13 19:10:09 +08:00
CLASSPATH = " $CLASSPATH " :" $file "
2016-04-30 01:28:33 +08:00
fi
2013-04-12 02:01:11 +08:00
done
2015-11-04 00:10:37 +08:00
shopt -u nullglob
2013-04-12 02:01:11 +08:00
2017-08-10 06:48:09 +08:00
if [ -z " $CLASSPATH " ] ; then
2018-03-13 05:34:59 +08:00
echo " Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion= $SCALA_VERSION ' "
2017-08-10 06:48:09 +08:00
exit 1
fi
2013-07-11 23:06:43 +08:00
# JMX settings
2011-08-02 07:41:24 +08:00
if [ -z " $KAFKA_JMX_OPTS " ] ; then
2024-05-31 00:54:23 +08:00
KAFKA_JMX_OPTS = "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
2011-08-02 07:41:24 +08:00
fi
2013-01-24 14:23:44 +08:00
2013-07-11 23:06:43 +08:00
# JMX port to use
2011-08-02 07:41:24 +08:00
if [ $JMX_PORT ] ; then
KAFKA_JMX_OPTS = " $KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port= $JMX_PORT "
2022-12-02 11:46:37 +08:00
if ! echo " $KAFKA_JMX_OPTS " | grep -qF -- '-Dcom.sun.management.jmxremote.rmi.port=' ; then
# If unset, set the RMI port to address issues with monitoring Kafka running in containers
KAFKA_JMX_OPTS = " $KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.rmi.port= $JMX_PORT "
fi
2011-08-02 07:41:24 +08:00
fi
2013-01-24 14:23:44 +08:00
2015-08-06 02:46:11 +08:00
# Log directory to use
if [ " x $LOG_DIR " = "x" ] ; then
2016-08-09 23:33:53 +08:00
LOG_DIR = " $base_dir /logs "
2015-08-06 02:46:11 +08:00
fi
2013-07-11 23:06:43 +08:00
# Log4j settings
if [ -z " $KAFKA_LOG4J_OPTS " ] ; then
2015-08-06 02:46:11 +08:00
# Log to console. This is a tool.
2025-01-01 21:06:01 +08:00
LOG4J_DIR = " $base_dir /config/tools-log4j2.yaml "
2016-08-09 23:33:53 +08:00
# If Cygwin is detected, LOG4J_DIR is converted to Windows format.
2023-10-26 17:29:39 +08:00
( ( WINDOWS_OS_FORMAT ) ) && LOG4J_DIR = $( cygpath --path --mixed " ${ LOG4J_DIR } " )
2025-01-01 21:06:01 +08:00
KAFKA_LOG4J_OPTS = " -Dlog4j2.configurationFile= ${ LOG4J_DIR } "
2015-08-06 02:46:11 +08:00
else
2025-04-17 16:59:21 +08:00
if echo " $KAFKA_LOG4J_OPTS " | grep -E " log4j\.[^[:space:]]+(\.properties|\.xml) $" >/dev/null; then
2024-12-30 20:38:58 +08:00
# Enable Log4j 1.x configuration compatibility mode for Log4j 2
export LOG4J_COMPATIBILITY = true
echo DEPRECATED: A Log4j 1.x configuration file has been detected, which is no longer recommended. >& 2
echo To use a Log4j 2.x configuration, please see https://logging.apache.org/log4j/2.x/migrate-from-log4j1.html#Log4j2ConfigurationFormat for details about Log4j configuration file migration. >& 2
echo You can also use the \$ KAFKA_HOME/config/tools-log4j2.yaml file as a starting point. Make sure to remove the Log4j 1.x configuration after completing the migration. >& 2
fi
2015-08-06 02:46:11 +08:00
# create logs directory
if [ ! -d " $LOG_DIR " ] ; then
mkdir -p " $LOG_DIR "
fi
2013-07-11 23:06:43 +08:00
fi
2016-08-09 23:33:53 +08:00
# If Cygwin is detected, LOG_DIR is converted to Windows format.
2023-10-26 17:29:39 +08:00
( ( WINDOWS_OS_FORMAT ) ) && LOG_DIR = $( cygpath --path --mixed " ${ LOG_DIR } " )
KAFKA_LOG4J_CMD_OPTS = " -Dkafka.logs.dir= $LOG_DIR $KAFKA_LOG4J_OPTS "
2013-11-19 13:02:00 +08:00
2013-07-11 23:06:43 +08:00
# Generic jvm settings you want to add
if [ -z " $KAFKA_OPTS " ] ; then
KAFKA_OPTS = ""
fi
2016-02-24 07:55:09 +08:00
# Set Debug options if enabled
if [ " x $KAFKA_DEBUG " != "x" ] ; then
# Use default ports
DEFAULT_JAVA_DEBUG_PORT = "5005"
if [ -z " $JAVA_DEBUG_PORT " ] ; then
JAVA_DEBUG_PORT = " $DEFAULT_JAVA_DEBUG_PORT "
fi
# Use the defaults if JAVA_DEBUG_OPTS was not set
DEFAULT_JAVA_DEBUG_OPTS = " -agentlib:jdwp=transport=dt_socket,server=y,suspend= ${ DEBUG_SUSPEND_FLAG :- n } ,address= $JAVA_DEBUG_PORT "
if [ -z " $JAVA_DEBUG_OPTS " ] ; then
JAVA_DEBUG_OPTS = " $DEFAULT_JAVA_DEBUG_OPTS "
fi
echo " Enabling Java debug options: $JAVA_DEBUG_OPTS "
KAFKA_OPTS = " $JAVA_DEBUG_OPTS $KAFKA_OPTS "
fi
2013-07-11 23:06:43 +08:00
# Which java to use
2011-08-02 07:41:24 +08:00
if [ -z " $JAVA_HOME " ] ; then
JAVA = "java"
else
JAVA = " $JAVA_HOME /bin/java "
fi
2013-07-11 23:06:43 +08:00
# Memory options
if [ -z " $KAFKA_HEAP_OPTS " ] ; then
KAFKA_HEAP_OPTS = "-Xmx256M"
fi
# JVM performance options
2020-01-08 19:57:57 +08:00
# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported
2013-07-11 23:06:43 +08:00
if [ -z " $KAFKA_JVM_PERFORMANCE_OPTS " ] ; then
2020-01-08 19:57:57 +08:00
KAFKA_JVM_PERFORMANCE_OPTS = "-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
2013-07-11 23:06:43 +08:00
fi
2013-11-19 13:02:00 +08:00
while [ $# -gt 0 ] ; do
COMMAND = $1
case $COMMAND in
-name)
DAEMON_NAME = $2
CONSOLE_OUTPUT_FILE = $LOG_DIR /$DAEMON_NAME .out
shift 2
; ;
-loggc)
2015-01-26 10:43:17 +08:00
if [ -z " $KAFKA_GC_LOG_OPTS " ] ; then
2013-11-19 13:02:00 +08:00
GC_LOG_ENABLED = "true"
fi
shift
; ;
-daemon)
DAEMON_MODE = "true"
shift
; ;
*)
break
; ;
esac
done
2013-07-11 23:06:43 +08:00
# GC options
GC_FILE_SUFFIX = '-gc.log'
GC_LOG_FILE_NAME = ''
2013-11-19 13:02:00 +08:00
if [ " x $GC_LOG_ENABLED " = "xtrue" ] ; then
GC_LOG_FILE_NAME = $DAEMON_NAME $GC_FILE_SUFFIX
2018-04-20 06:34:13 +08:00
# The first segment of the version number, which is '1' for releases before Java 9
2017-08-10 08:43:02 +08:00
# it then becomes '9', '10', ...
2018-04-20 06:34:13 +08:00
# Some examples of the first line of `java --version`:
# 8 -> java version "1.8.0_152"
# 9.0.4 -> java version "9.0.4"
# 10 -> java version "10" 2018-03-20
# 10.0.1 -> java version "10.0.1" 2018-04-17
# We need to match to the end of the line to prevent sed from printing the characters that do not match
2020-04-30 16:21:23 +08:00
JAVA_MAJOR_VERSION = $( " $JAVA " -version 2>& 1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p' )
2017-08-10 08:43:02 +08:00
if [ [ " $JAVA_MAJOR_VERSION " -ge "9" ] ] ; then
2020-06-09 17:23:40 +08:00
KAFKA_GC_LOG_OPTS = " -Xlog:gc*:file= $LOG_DIR / $GC_LOG_FILE_NAME :time,tags:filecount=10,filesize=100M "
2017-08-10 08:43:02 +08:00
else
KAFKA_GC_LOG_OPTS = " -Xloggc: $LOG_DIR / $GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M "
fi
2013-07-11 23:06:43 +08:00
fi
2018-01-10 06:49:18 +08:00
# Remove a possible colon prefix from the classpath (happens at lines like `CLASSPATH="$CLASSPATH:$file"` when CLASSPATH is blank)
# Syntax used on the right side is native Bash string manipulation; for more details see
# http://tldp.org/LDP/abs/html/string-manipulation.html, specifically the section titled "Substring Removal"
CLASSPATH = ${ CLASSPATH # : }
2016-08-09 23:33:53 +08:00
# If Cygwin is detected, classpath is converted to Windows format.
2023-10-26 17:29:39 +08:00
( ( WINDOWS_OS_FORMAT ) ) && CLASSPATH = $( cygpath --path --mixed " ${ CLASSPATH } " )
2016-08-09 23:33:53 +08:00
2024-05-31 00:54:23 +08:00
# If KAFKA_MODE=native, it will bring up Kafka in the native mode.
# It expects the Kafka executable binary to be present at $base_dir/kafka.Kafka.
# This is specifically used to run system tests on native Kafka - by bringing up Kafka in the native mode.
if [ [ " x $KAFKA_MODE " = = "xnative" ] ] && [ [ " $* " = = *"kafka.Kafka" * ] ] ; then
exec $base_dir /kafka.Kafka start --config " $2 " $KAFKA_LOG4J_CMD_OPTS $KAFKA_JMX_OPTS $KAFKA_OPTS
2013-11-19 13:02:00 +08:00
else
2024-05-31 00:54:23 +08:00
# Launch mode
if [ " x $DAEMON_MODE " = "xtrue" ] ; then
nohup " $JAVA " $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp " $CLASSPATH " $KAFKA_OPTS " $@ " > " $CONSOLE_OUTPUT_FILE " 2>& 1 < /dev/null &
else
exec " $JAVA " $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_CMD_OPTS -cp " $CLASSPATH " $KAFKA_OPTS " $@ "
fi
2013-11-19 13:02:00 +08:00
fi