mirror of https://github.com/apache/kafka.git
KAFKA-4450; Add upgrade tests for 0.10.1 and rename TRUNK to DEV_BRANCH to reduce confusion
Author: Ewen Cheslack-Postava <me@ewencp.org> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes #2457 from ewencp/kafka-4450-upgrade-tests
This commit is contained in:
parent
8827a5b34e
commit
6264cc1557
|
@ -55,7 +55,7 @@ docker network inspect knw
|
|||
|
||||
for i in $(seq -w 1 ${KAFKA_NUM_CONTAINERS}); do
|
||||
echo knode${i}
|
||||
docker exec knode${i} bash -c "(tar xfz /kafka_src/core/build/distributions/kafka_*SNAPSHOT.tgz -C /opt || echo missing kafka tgz did you build kafka tarball) && mv /opt/kafka*SNAPSHOT /opt/kafka-trunk && ls -l /opt"
|
||||
docker exec knode${i} bash -c "(tar xfz /kafka_src/core/build/distributions/kafka_*SNAPSHOT.tgz -C /opt || echo missing kafka tgz did you build kafka tarball) && mv /opt/kafka*SNAPSHOT /opt/kafka-dev && ls -l /opt"
|
||||
docker exec knode01 bash -c "ssh knode$i hostname"
|
||||
done
|
||||
|
||||
|
@ -64,7 +64,7 @@ done
|
|||
(cd ${KAFKA_SRC} && ./gradlew copyDependantTestLibs)
|
||||
for i in $(seq -w 1 ${KAFKA_NUM_CONTAINERS}); do
|
||||
echo knode${i}
|
||||
docker exec knode${i} bash -c "cp /kafka_src/core/build/dependant-testlibs/* /opt/kafka-trunk/libs/"
|
||||
docker exec knode${i} bash -c "cp /kafka_src/core/build/dependant-testlibs/* /opt/kafka-dev/libs/"
|
||||
docker exec knode01 bash -c "ssh knode$i hostname"
|
||||
done
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
# due to python version naming restrictions, which are enforced by python packaging tools
|
||||
# (see https://www.python.org/dev/peps/pep-0440/)
|
||||
#
|
||||
# Instead, in trunk, the version should have a suffix of the form ".devN"
|
||||
# Instead, in development branches, the version should have a suffix of the form ".devN"
|
||||
#
|
||||
# For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0"
|
||||
__version__ = '0.10.3.0.dev0'
|
||||
|
|
|
@ -22,7 +22,7 @@ from ducktape.tests.test import Test
|
|||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService, throughput, latency, compute_aggregate_throughput
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.version import TRUNK, KafkaVersion
|
||||
from kafkatest.version import DEV_BRANCH, KafkaVersion
|
||||
|
||||
TOPIC_REP_ONE = "topic-replication-factor-one"
|
||||
TOPIC_REP_THREE = "topic-replication-factor-three"
|
||||
|
@ -72,8 +72,8 @@ class Benchmark(Test):
|
|||
@cluster(num_nodes=7)
|
||||
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
|
||||
def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE,
|
||||
compression_type="none", security_protocol='PLAINTEXT', client_version=str(TRUNK),
|
||||
broker_version=str(TRUNK)):
|
||||
compression_type="none", security_protocol='PLAINTEXT', client_version=str(DEV_BRANCH),
|
||||
broker_version=str(DEV_BRANCH)):
|
||||
"""
|
||||
Setup: 1 node zk + 3 node kafka cluster
|
||||
Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
|
||||
|
@ -104,8 +104,8 @@ class Benchmark(Test):
|
|||
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
||||
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
|
||||
def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT',
|
||||
interbroker_security_protocol=None, client_version=str(TRUNK),
|
||||
broker_version=str(TRUNK)):
|
||||
interbroker_security_protocol=None, client_version=str(DEV_BRANCH),
|
||||
broker_version=str(DEV_BRANCH)):
|
||||
"""
|
||||
Setup: 1 node zk + 3 node kafka cluster
|
||||
Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
|
||||
|
@ -162,8 +162,8 @@ class Benchmark(Test):
|
|||
@cluster(num_nodes=6)
|
||||
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
|
||||
def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT",
|
||||
interbroker_security_protocol=None, client_version=str(TRUNK),
|
||||
broker_version=str(TRUNK)):
|
||||
interbroker_security_protocol=None, client_version=str(DEV_BRANCH),
|
||||
broker_version=str(DEV_BRANCH)):
|
||||
"""
|
||||
Setup: 1 node zk + 3 node kafka cluster
|
||||
Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
|
||||
|
@ -194,7 +194,7 @@ class Benchmark(Test):
|
|||
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
|
||||
def test_producer_and_consumer(self, compression_type="none", security_protocol="PLAINTEXT",
|
||||
interbroker_security_protocol=None, new_consumer=True,
|
||||
client_version=str(TRUNK), broker_version=str(TRUNK)):
|
||||
client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)):
|
||||
"""
|
||||
Setup: 1 node zk + 3 node kafka cluster
|
||||
Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
|
||||
|
@ -243,7 +243,7 @@ class Benchmark(Test):
|
|||
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
|
||||
def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT",
|
||||
interbroker_security_protocol=None, new_consumer=True, num_consumers=1,
|
||||
client_version=str(TRUNK), broker_version=str(TRUNK)):
|
||||
client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)):
|
||||
"""
|
||||
Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
|
||||
(using new consumer iff new_consumer == True), and report throughput.
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
import importlib
|
||||
import os
|
||||
|
||||
from kafkatest.version import get_version, KafkaVersion, TRUNK
|
||||
from kafkatest.version import get_version, KafkaVersion, DEV_BRANCH
|
||||
|
||||
|
||||
"""This module serves a few purposes:
|
||||
|
@ -43,7 +43,7 @@ TOOLS_JAR_NAME = "tools"
|
|||
TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME = "tools-dependant-libs"
|
||||
|
||||
JARS = {
|
||||
"trunk": {
|
||||
"dev": {
|
||||
CORE_JAR_NAME: "core/build/*/*.jar",
|
||||
CORE_LIBS_JAR_NAME: "core/build/libs/*.jar",
|
||||
CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "core/build/dependant-testlibs/*.jar",
|
||||
|
@ -97,7 +97,7 @@ class KafkaPathResolverMixin(object):
|
|||
class KafkaSystemTestPathResolver(object):
|
||||
"""Path resolver for Kafka system tests which assumes the following layout:
|
||||
|
||||
/opt/kafka-trunk # Current version of kafka under test
|
||||
/opt/kafka-dev # Current version of kafka under test
|
||||
/opt/kafka-0.9.0.1 # Example of an older version of kafka installed from tarball
|
||||
/opt/kafka-<version> # Other previous versions of kafka
|
||||
...
|
||||
|
@ -106,7 +106,7 @@ class KafkaSystemTestPathResolver(object):
|
|||
self.context = context
|
||||
self.project = project
|
||||
|
||||
def home(self, node_or_version=TRUNK):
|
||||
def home(self, node_or_version=DEV_BRANCH):
|
||||
version = self._version(node_or_version)
|
||||
home_dir = self.project
|
||||
if version is not None:
|
||||
|
@ -114,15 +114,15 @@ class KafkaSystemTestPathResolver(object):
|
|||
|
||||
return os.path.join(KAFKA_INSTALL_ROOT, home_dir)
|
||||
|
||||
def bin(self, node_or_version=TRUNK):
|
||||
def bin(self, node_or_version=DEV_BRANCH):
|
||||
version = self._version(node_or_version)
|
||||
return os.path.join(self.home(version), "bin")
|
||||
|
||||
def script(self, script_name, node_or_version=TRUNK):
|
||||
def script(self, script_name, node_or_version=DEV_BRANCH):
|
||||
version = self._version(node_or_version)
|
||||
return os.path.join(self.bin(version), script_name)
|
||||
|
||||
def jar(self, jar_name, node_or_version=TRUNK):
|
||||
def jar(self, jar_name, node_or_version=DEV_BRANCH):
|
||||
version = self._version(node_or_version)
|
||||
return os.path.join(self.home(version), JARS[str(version)][jar_name])
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ from ducktape.mark.resource import cluster
|
|||
from kafkatest.services.kafka import KafkaService, config_property
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.utils import is_version
|
||||
from kafkatest.version import LATEST_0_8_2, TRUNK
|
||||
from kafkatest.version import LATEST_0_8_2, DEV_BRANCH
|
||||
|
||||
|
||||
class KafkaVersionTest(Test):
|
||||
|
@ -47,12 +47,12 @@ class KafkaVersionTest(Test):
|
|||
@cluster(num_nodes=3)
|
||||
def test_multi_version(self):
|
||||
"""Test kafka service node-versioning api - ensure we can bring up a 2-node cluster, one on version 0.8.2.X,
|
||||
the other on trunk."""
|
||||
the other on the current development branch."""
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=2, zk=self.zk,
|
||||
topics={self.topic: {"partitions": 1, "replication-factor": 2}})
|
||||
self.kafka.nodes[1].version = LATEST_0_8_2
|
||||
self.kafka.nodes[1].config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
|
||||
self.kafka.start()
|
||||
|
||||
assert is_version(self.kafka.nodes[0], [TRUNK.vstring])
|
||||
assert is_version(self.kafka.nodes[0], [DEV_BRANCH.vstring])
|
||||
assert is_version(self.kafka.nodes[1], [LATEST_0_8_2])
|
||||
|
|
|
@ -21,7 +21,7 @@ from kafkatest.services.kafka import KafkaService
|
|||
from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
|
||||
from kafkatest.services.performance import latency, compute_aggregate_throughput
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.version import TRUNK, LATEST_0_8_2, LATEST_0_9, KafkaVersion
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, KafkaVersion
|
||||
|
||||
|
||||
class PerformanceServiceTest(Test):
|
||||
|
@ -42,8 +42,8 @@ class PerformanceServiceTest(Test):
|
|||
@parametrize(version=str(LATEST_0_8_2), new_consumer=False)
|
||||
@parametrize(version=str(LATEST_0_9), new_consumer=False)
|
||||
@parametrize(version=str(LATEST_0_9))
|
||||
@parametrize(version=str(TRUNK), new_consumer=False)
|
||||
@parametrize(version=str(TRUNK))
|
||||
@parametrize(version=str(DEV_BRANCH), new_consumer=False)
|
||||
@parametrize(version=str(DEV_BRANCH))
|
||||
def test_version(self, version=str(LATEST_0_9), new_consumer=True):
|
||||
"""
|
||||
Sanity check out producer performance service - verify that we can run the service with a small
|
||||
|
|
|
@ -23,7 +23,7 @@ from kafkatest.services.kafka import KafkaService
|
|||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.utils import is_version
|
||||
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
|
||||
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, DEV_BRANCH, KafkaVersion
|
||||
|
||||
|
||||
class TestVerifiableProducer(Test):
|
||||
|
@ -48,10 +48,10 @@ class TestVerifiableProducer(Test):
|
|||
@cluster(num_nodes=3)
|
||||
@parametrize(producer_version=str(LATEST_0_8_2))
|
||||
@parametrize(producer_version=str(LATEST_0_9))
|
||||
@parametrize(producer_version=str(TRUNK))
|
||||
def test_simple_run(self, producer_version=TRUNK):
|
||||
@parametrize(producer_version=str(DEV_BRANCH))
|
||||
def test_simple_run(self, producer_version=DEV_BRANCH):
|
||||
"""
|
||||
Test that we can start VerifiableProducer on trunk or against the 0.8.2 jar, and
|
||||
Test that we can start VerifiableProducer on the current branch snapshot version or against the 0.8.2 jar, and
|
||||
verify that we can produce a small number of messages.
|
||||
"""
|
||||
node = self.producer.nodes[0]
|
||||
|
@ -61,11 +61,11 @@ class TestVerifiableProducer(Test):
|
|||
err_msg="Producer failed to start in a reasonable amount of time.")
|
||||
|
||||
# using version.vstring (distutils.version.LooseVersion) is a tricky way of ensuring
|
||||
# that this check works with TRUNK
|
||||
# When running VerifiableProducer 0.8.X, both trunk version and 0.8.X should show up because of the way
|
||||
# verifiable producer pulls in some trunk directories into its classpath
|
||||
# that this check works with DEV_BRANCH
|
||||
# When running VerifiableProducer 0.8.X, both the current branch version and 0.8.X should show up because of the
|
||||
# way verifiable producer pulls in some development directories into its classpath
|
||||
if node.version <= LATEST_0_8_2:
|
||||
assert is_version(node, [node.version.vstring, TRUNK.vstring])
|
||||
assert is_version(node, [node.version.vstring, DEV_BRANCH.vstring])
|
||||
else:
|
||||
assert is_version(node, [node.version.vstring])
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError
|
|||
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||
from kafkatest.services.monitor.jmx import JmxMixin
|
||||
from kafkatest.version import TRUNK, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0
|
||||
|
||||
"""
|
||||
0.8.2.1 ConsoleConsumer options
|
||||
|
@ -87,7 +87,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
|
|||
}
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=True,
|
||||
message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=TRUNK,
|
||||
message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=DEV_BRANCH,
|
||||
client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
|
||||
enable_systest_events=False, stop_timeout_sec=15, print_timestamp=False):
|
||||
"""
|
||||
|
|
|
@ -30,7 +30,7 @@ from kafkatest.services.kafka import config_property
|
|||
from kafkatest.services.monitor.jmx import JmxMixin
|
||||
from kafkatest.services.security.minikdc import MiniKdc
|
||||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
from kafkatest.version import TRUNK
|
||||
from kafkatest.version import DEV_BRANCH
|
||||
|
||||
Port = collections.namedtuple('Port', ['name', 'number', 'open'])
|
||||
|
||||
|
@ -67,7 +67,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
|
||||
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
|
||||
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
|
||||
authorizer_class_name=None, topics=None, version=TRUNK, jmx_object_names=None,
|
||||
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
|
||||
jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=[]):
|
||||
"""
|
||||
:type context
|
||||
|
|
|
@ -18,7 +18,7 @@ import os
|
|||
|
||||
from kafkatest.services.performance import PerformanceService
|
||||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
from kafkatest.version import TRUNK, V_0_9_0_0, LATEST_0_10_0
|
||||
from kafkatest.version import DEV_BRANCH, V_0_9_0_0, LATEST_0_10_0
|
||||
|
||||
|
||||
class ConsumerPerformanceService(PerformanceService):
|
||||
|
@ -70,7 +70,7 @@ class ConsumerPerformanceService(PerformanceService):
|
|||
"collect_default": True}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, topic, messages, version=TRUNK, new_consumer=True, settings={}):
|
||||
def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANCH, new_consumer=True, settings={}):
|
||||
super(ConsumerPerformanceService, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.security_config = kafka.security_config.client_config()
|
||||
|
|
|
@ -17,7 +17,7 @@ import os
|
|||
|
||||
from kafkatest.services.performance import PerformanceService
|
||||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
from kafkatest.version import TRUNK, V_0_9_0_0
|
||||
from kafkatest.version import DEV_BRANCH, V_0_9_0_0
|
||||
|
||||
|
||||
|
||||
|
@ -45,7 +45,7 @@ class EndToEndLatencyService(PerformanceService):
|
|||
"collect_default": True}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, compression_type="none", version=TRUNK, acks=1):
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, compression_type="none", version=DEV_BRANCH, acks=1):
|
||||
super(EndToEndLatencyService, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.security_config = kafka.security_config.client_config()
|
||||
|
|
|
@ -22,7 +22,7 @@ from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME, TOOLS_DEPENDA
|
|||
from kafkatest.services.monitor.jmx import JmxMixin
|
||||
from kafkatest.services.performance import PerformanceService
|
||||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
from kafkatest.version import TRUNK, V_0_9_0_0
|
||||
from kafkatest.version import DEV_BRANCH, V_0_9_0_0
|
||||
|
||||
|
||||
class ProducerPerformanceService(JmxMixin, PerformanceService):
|
||||
|
@ -34,7 +34,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
|
|||
LOG_FILE = os.path.join(LOG_DIR, "producer_performance.log")
|
||||
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=TRUNK, settings=None,
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=DEV_BRANCH, settings=None,
|
||||
intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=None):
|
||||
|
||||
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [])
|
||||
|
@ -89,11 +89,11 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
|
|||
|
||||
cmd = ""
|
||||
|
||||
if node.version < TRUNK:
|
||||
if node.version < DEV_BRANCH:
|
||||
# In order to ensure more consistent configuration between versions, always use the ProducerPerformance
|
||||
# tool from trunk
|
||||
tools_jar = self.path.jar(TOOLS_JAR_NAME, TRUNK)
|
||||
tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, TRUNK)
|
||||
# tool from the development branch
|
||||
tools_jar = self.path.jar(TOOLS_JAR_NAME, DEV_BRANCH)
|
||||
tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
|
||||
|
||||
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
|
||||
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
|
||||
|
|
|
@ -24,7 +24,7 @@ from tempfile import mkstemp
|
|||
from ducktape.services.service import Service
|
||||
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, CORE_LIBS_JAR_NAME, CORE_DEPENDANT_TEST_LIBS_JAR_NAME
|
||||
from kafkatest.version import TRUNK
|
||||
from kafkatest.version import DEV_BRANCH
|
||||
|
||||
|
||||
class MiniKdc(KafkaPathResolverMixin, Service):
|
||||
|
@ -103,8 +103,8 @@ class MiniKdc(KafkaPathResolverMixin, Service):
|
|||
principals = 'client ' + kafka_principals + ' ' + self.extra_principals
|
||||
self.logger.info("Starting MiniKdc with principals " + principals)
|
||||
|
||||
core_libs_jar = self.path.jar(CORE_LIBS_JAR_NAME, TRUNK)
|
||||
core_dependant_test_libs_jar = self.path.jar(CORE_DEPENDANT_TEST_LIBS_JAR_NAME, TRUNK)
|
||||
core_libs_jar = self.path.jar(CORE_LIBS_JAR_NAME, DEV_BRANCH)
|
||||
core_dependant_test_libs_jar = self.path.jar(CORE_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
|
||||
|
||||
cmd = "for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_libs_jar
|
||||
cmd += " for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_dependant_test_libs_jar
|
||||
|
|
|
@ -22,7 +22,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError
|
|||
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||
from kafkatest.services.kafka import TopicPartition
|
||||
from kafkatest.version import TRUNK
|
||||
from kafkatest.version import DEV_BRANCH
|
||||
|
||||
|
||||
class ConsumerState:
|
||||
|
@ -136,7 +136,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
|
|||
def __init__(self, context, num_nodes, kafka, topic, group_id,
|
||||
max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
|
||||
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
|
||||
version=TRUNK, stop_timeout_sec=30):
|
||||
version=DEV_BRANCH, stop_timeout_sec=30):
|
||||
super(VerifiableConsumer, self).__init__(context, num_nodes)
|
||||
self.log_level = "TRACE"
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ from ducktape.utils.util import wait_until
|
|||
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, TOOLS_JAR_NAME, TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
|
||||
from kafkatest.utils import is_int, is_int_with_prefix
|
||||
from kafkatest.version import TRUNK, LATEST_0_8_2
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2
|
||||
from kafkatest.utils.remote_account import line_count
|
||||
|
||||
|
||||
|
@ -50,7 +50,7 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
|
|||
}
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
|
||||
message_validator=is_int, compression_types=None, version=TRUNK, acks=None,
|
||||
message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
|
||||
stop_timeout_sec=150):
|
||||
"""
|
||||
:param max_messages is a number of messages to be produced per producer
|
||||
|
@ -181,9 +181,9 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
|
|||
cmd = ""
|
||||
if node.version <= LATEST_0_8_2:
|
||||
# 0.8.2.X releases do not have VerifiableProducer.java, so cheat and add
|
||||
# the tools jar from trunk to the classpath
|
||||
tools_jar = self.path.jar(TOOLS_JAR_NAME, TRUNK)
|
||||
tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, TRUNK)
|
||||
# the tools jar from the development branch to the classpath
|
||||
tools_jar = self.path.jar(TOOLS_JAR_NAME, DEV_BRANCH)
|
||||
tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
|
||||
|
||||
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
|
||||
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
|
||||
|
|
|
@ -23,7 +23,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError
|
|||
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
from kafkatest.version import TRUNK
|
||||
from kafkatest.version import DEV_BRANCH
|
||||
|
||||
|
||||
class ZookeeperService(KafkaPathResolverMixin, Service):
|
||||
|
@ -122,7 +122,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
|
|||
"""
|
||||
Queries zookeeper for data associated with 'path' and returns all fields in the schema
|
||||
"""
|
||||
kafka_run_class = self.path.script("kafka-run-class.sh", TRUNK)
|
||||
kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
|
||||
cmd = "%s kafka.tools.ZooKeeperMainWrapper -server %s get %s" % \
|
||||
(kafka_run_class, self.connect_setting(), path)
|
||||
self.logger.debug(cmd)
|
||||
|
|
|
@ -23,7 +23,7 @@ from ducktape.tests.test import TestContext
|
|||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from ducktape.tests.test import Test
|
||||
from kafkatest.version import TRUNK, LATEST_0_10_0, LATEST_0_10_1, V_0_10_1_0, KafkaVersion
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, V_0_10_1_0, KafkaVersion
|
||||
|
||||
def get_broker_features(broker_version):
|
||||
features = {}
|
||||
|
@ -93,7 +93,7 @@ class ClientCompatibilityFeaturesTest(Test):
|
|||
self.logger.info("** Command failed. See %s for log messages." % ssh_log_file)
|
||||
raise e
|
||||
|
||||
@parametrize(broker_version=str(TRUNK))
|
||||
@parametrize(broker_version=str(DEV_BRANCH))
|
||||
@parametrize(broker_version=str(LATEST_0_10_0))
|
||||
@parametrize(broker_version=str(LATEST_0_10_1))
|
||||
def run_compatibility_test(self, broker_version):
|
||||
|
|
|
@ -22,7 +22,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
|||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.utils import is_int_with_prefix
|
||||
from kafkatest.version import TRUNK, LATEST_0_10_0, LATEST_0_10_1, KafkaVersion
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, KafkaVersion
|
||||
|
||||
class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
|
||||
"""
|
||||
|
@ -52,7 +52,7 @@ class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
|
|||
# Override this since we're adding services outside of the constructor
|
||||
return super(ClientCompatibilityProduceConsumeTest, self).min_cluster_size() + self.num_producers + self.num_consumers
|
||||
|
||||
@parametrize(broker_version=str(TRUNK))
|
||||
@parametrize(broker_version=str(DEV_BRANCH))
|
||||
@parametrize(broker_version=str(LATEST_0_10_0))
|
||||
@parametrize(broker_version=str(LATEST_0_10_1))
|
||||
def test_produce_consume(self, broker_version):
|
||||
|
|
|
@ -22,7 +22,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
|||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.utils import is_int
|
||||
from kafkatest.version import LATEST_0_9, LATEST_0_10, TRUNK, KafkaVersion
|
||||
from kafkatest.version import LATEST_0_9, LATEST_0_10, DEV_BRANCH, KafkaVersion
|
||||
|
||||
|
||||
class MessageFormatChangeTest(ProduceConsumeValidateTest):
|
||||
|
@ -58,7 +58,7 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
|
|||
err_msg="Producer did not produce all messages in reasonable amount of time"))
|
||||
|
||||
@cluster(num_nodes=10)
|
||||
@parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK))
|
||||
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH))
|
||||
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9))
|
||||
def test_compatibility(self, producer_version, consumer_version):
|
||||
""" This tests performs the following checks:
|
||||
|
@ -70,7 +70,7 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
|
|||
- The producers and consumers should not have any issue.
|
||||
- Note that for 0.9.x consumers/producers we only do steps 1 and 2
|
||||
"""
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: {
|
||||
"partitions": 3,
|
||||
"replication-factor": 3,
|
||||
'configs': {"min.insync.replicas": 2}}})
|
||||
|
@ -84,7 +84,7 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
|
|||
self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
|
||||
self.produce_and_consume(producer_version, consumer_version, "group2")
|
||||
|
||||
if producer_version == str(TRUNK) and consumer_version == str(TRUNK):
|
||||
if producer_version == str(DEV_BRANCH) and consumer_version == str(DEV_BRANCH):
|
||||
self.logger.info("Third format change back to 0.9.0")
|
||||
self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
|
||||
self.produce_and_consume(producer_version, consumer_version, "group3")
|
||||
|
|
|
@ -23,7 +23,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
|||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.utils import is_int
|
||||
from kafkatest.version import LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, TRUNK, KafkaVersion
|
||||
from kafkatest.version import LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, DEV_BRANCH, KafkaVersion
|
||||
|
||||
|
||||
# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
|
||||
|
@ -47,17 +47,18 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
|
|||
@cluster(num_nodes=6)
|
||||
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
||||
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
||||
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
||||
@parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
||||
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=None)
|
||||
@parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
|
||||
@parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
||||
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
||||
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
|
||||
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=None)
|
||||
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
|
||||
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
||||
@parametrize(producer_version=str(LATEST_0_10_1), consumer_version=str(LATEST_0_10_1), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
||||
@parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
||||
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
|
||||
@parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
|
||||
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
|
||||
def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
|
||||
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: {
|
||||
"partitions": 3,
|
||||
"replication-factor": 3,
|
||||
'configs': {"min.insync.replicas": 2}}})
|
||||
|
|
|
@ -25,7 +25,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
|||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.utils import is_int
|
||||
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, TRUNK, KafkaVersion
|
||||
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, DEV_BRANCH, KafkaVersion
|
||||
|
||||
class TestUpgrade(ProduceConsumeValidateTest):
|
||||
|
||||
|
@ -46,7 +46,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
|
|||
self.logger.info("First pass bounce - rolling upgrade")
|
||||
for node in self.kafka.nodes:
|
||||
self.kafka.stop_node(node)
|
||||
node.version = TRUNK
|
||||
node.version = DEV_BRANCH
|
||||
node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version
|
||||
node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version
|
||||
self.kafka.start_node(node)
|
||||
|
@ -62,6 +62,8 @@ class TestUpgrade(ProduceConsumeValidateTest):
|
|||
self.kafka.start_node(node)
|
||||
|
||||
@cluster(num_nodes=6)
|
||||
@parametrize(from_kafka_version=str(LATEST_0_10_1), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_0_10_1), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
|
||||
@parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
|
||||
@parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=False)
|
||||
|
@ -80,7 +82,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
|
|||
@parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
|
||||
def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,
|
||||
new_consumer=True, security_protocol="PLAINTEXT"):
|
||||
"""Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0 or 0.10.0 to the current version
|
||||
"""Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0, 0.10.0, 0.10.1 to the current version
|
||||
|
||||
from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X, 0.9.0.x or 0.10.0.x
|
||||
|
||||
|
@ -112,7 +114,8 @@ class TestUpgrade(ProduceConsumeValidateTest):
|
|||
compression_types=compression_types,
|
||||
version=KafkaVersion(from_kafka_version))
|
||||
|
||||
assert self.zk.query("/cluster/id") is None
|
||||
if from_kafka_version <= LATEST_0_10_0:
|
||||
assert self.zk.query("/cluster/id") is None
|
||||
|
||||
# TODO - reduce the timeout
|
||||
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
|
||||
|
|
|
@ -31,11 +31,8 @@ class KafkaVersion(LooseVersion):
|
|||
assert v10 > v9 # assertion passes!
|
||||
"""
|
||||
def __init__(self, version_string):
|
||||
self.is_trunk = (version_string.lower() == "trunk")
|
||||
if self.is_trunk:
|
||||
# Since "trunk" may actually be a branch that is not trunk,
|
||||
# use kafkatest_version() for comparison purposes,
|
||||
# and track whether we're in "trunk" with a flag
|
||||
self.is_dev = (version_string.lower() == "dev")
|
||||
if self.is_dev:
|
||||
version_string = kafkatest_version()
|
||||
|
||||
# Drop dev suffix if present
|
||||
|
@ -48,22 +45,22 @@ class KafkaVersion(LooseVersion):
|
|||
LooseVersion.__init__(self, version_string)
|
||||
|
||||
def __str__(self):
|
||||
if self.is_trunk:
|
||||
return "trunk"
|
||||
if self.is_dev:
|
||||
return "dev"
|
||||
else:
|
||||
return LooseVersion.__str__(self)
|
||||
|
||||
|
||||
def get_version(node=None):
|
||||
"""Return the version attached to the given node.
|
||||
Default to trunk if node or node.version is undefined (aka None)
|
||||
Default to DEV_BRANCH if node or node.version is undefined (aka None)
|
||||
"""
|
||||
if node is not None and hasattr(node, "version") and node.version is not None:
|
||||
return node.version
|
||||
else:
|
||||
return TRUNK
|
||||
return DEV_BRANCH
|
||||
|
||||
TRUNK = KafkaVersion("trunk")
|
||||
DEV_BRANCH = KafkaVersion("dev")
|
||||
|
||||
# 0.8.2.X versions
|
||||
V_0_8_2_1 = KafkaVersion("0.8.2.1")
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
|
||||
from kafkatest.directory_layout.kafka_path import create_path_resolver, KafkaSystemTestPathResolver, \
|
||||
KAFKA_PATH_RESOLVER_KEY
|
||||
from kafkatest.version import V_0_9_0_1, TRUNK, KafkaVersion
|
||||
from kafkatest.version import V_0_9_0_1, DEV_BRANCH, KafkaVersion
|
||||
|
||||
|
||||
class DummyContext(object):
|
||||
|
@ -56,9 +56,9 @@ class CheckCreatePathResolver(object):
|
|||
"""Check expected path resolution without any version specified."""
|
||||
resolver = create_path_resolver(DummyContext())
|
||||
|
||||
assert resolver.home() == "/opt/kafka-trunk"
|
||||
assert resolver.bin() == "/opt/kafka-trunk/bin"
|
||||
assert resolver.script("kafka-run-class.sh") == "/opt/kafka-trunk/bin/kafka-run-class.sh"
|
||||
assert resolver.home() == "/opt/kafka-dev"
|
||||
assert resolver.bin() == "/opt/kafka-dev/bin"
|
||||
assert resolver.script("kafka-run-class.sh") == "/opt/kafka-dev/bin/kafka-run-class.sh"
|
||||
|
||||
def check_versioned_source_paths(self):
|
||||
"""Check expected paths when using versions."""
|
||||
|
@ -74,16 +74,16 @@ class CheckCreatePathResolver(object):
|
|||
"""
|
||||
resolver = create_path_resolver(DummyContext())
|
||||
|
||||
# Node with no version attribute should resolve to TRUNK
|
||||
# Node with no version attribute should resolve to DEV_BRANCH
|
||||
node = DummyNode()
|
||||
assert resolver._version(node) == TRUNK
|
||||
assert resolver._version(node) == DEV_BRANCH
|
||||
|
||||
# Node with version attribute should resolve to the version attribute
|
||||
node.version = V_0_9_0_1
|
||||
assert resolver._version(node) == V_0_9_0_1
|
||||
|
||||
# A KafkaVersion object should resolve to itself
|
||||
assert resolver._version(TRUNK) == TRUNK
|
||||
assert resolver._version(DEV_BRANCH) == DEV_BRANCH
|
||||
version = KafkaVersion("999.999.999")
|
||||
assert resolver._version(version) == version
|
||||
|
||||
|
|
|
@ -15,18 +15,18 @@
|
|||
|
||||
from mock import Mock
|
||||
|
||||
from kafkatest.version import TRUNK, V_0_8_2_2, get_version
|
||||
from kafkatest.version import DEV_BRANCH, V_0_8_2_2, get_version
|
||||
|
||||
|
||||
class CheckVersion(object):
|
||||
def check_get_version(self):
|
||||
"""Check default and override behavior of get_version"""
|
||||
node = None
|
||||
assert get_version(node) == TRUNK
|
||||
assert get_version(node) == DEV_BRANCH
|
||||
|
||||
node = Mock()
|
||||
node.version = None
|
||||
assert get_version(node) == TRUNK
|
||||
assert get_version(node) == DEV_BRANCH
|
||||
|
||||
node = Mock()
|
||||
node.version = V_0_8_2_2
|
||||
|
|
|
@ -158,7 +158,7 @@ public class VerifiableLog4jAppender {
|
|||
*
|
||||
* Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate
|
||||
* but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests,
|
||||
* we use VerifiableProducer from trunk tools package, and run it against 0.8.X.X kafka jars.
|
||||
* we use VerifiableProducer from the development tools package, and run it against 0.8.X.X kafka jars.
|
||||
* Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
|
||||
*/
|
||||
public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
|
||||
|
|
|
@ -160,7 +160,7 @@ public class VerifiableProducer {
|
|||
*
|
||||
* Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate
|
||||
* but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests,
|
||||
* we use VerifiableProducer from trunk tools package, and run it against 0.8.X.X kafka jars.
|
||||
* we use VerifiableProducer from the development tools package, and run it against 0.8.X.X kafka jars.
|
||||
* Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
|
||||
*/
|
||||
public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
|
||||
|
|
|
@ -37,11 +37,11 @@ if [ -z `which javac` ]; then
|
|||
fi
|
||||
|
||||
chmod a+rw /opt
|
||||
if [ -h /opt/kafka-trunk ]; then
|
||||
if [ -h /opt/kafka-dev ]; then
|
||||
# reset symlink
|
||||
rm /opt/kafka-trunk
|
||||
rm /opt/kafka-dev
|
||||
fi
|
||||
ln -s /vagrant /opt/kafka-trunk
|
||||
ln -s /vagrant /opt/kafka-dev
|
||||
|
||||
get_kafka() {
|
||||
version=$1
|
||||
|
|
|
@ -23,7 +23,7 @@ PUBLIC_ADDRESS=$2
|
|||
PUBLIC_ZOOKEEPER_ADDRESSES=$3
|
||||
JMX_PORT=$4
|
||||
|
||||
kafka_dir=/opt/kafka-trunk
|
||||
kafka_dir=/opt/kafka-dev
|
||||
cd $kafka_dir
|
||||
|
||||
sed \
|
||||
|
|
|
@ -22,7 +22,7 @@ ZKID=$1
|
|||
NUM_ZK=$2
|
||||
JMX_PORT=$3
|
||||
|
||||
kafka_dir=/opt/kafka-trunk
|
||||
kafka_dir=/opt/kafka-dev
|
||||
cd $kafka_dir
|
||||
|
||||
cp $kafka_dir/config/zookeeper.properties $kafka_dir/config/zookeeper-$ZKID.properties
|
||||
|
|
Loading…
Reference in New Issue