mirror of https://github.com/apache/kafka.git
KAFKA-9573: Fix JVM options to run early versions of Kafka on the latest JVMs (#8138)
Startup scripts for the early version of Kafka contain removed JVM options like `-XX:+PrintGCDateStamps` or `-XX:UseParNewGC`. When system tests run on JVM that doesn't support these options we should set up environment variables with correct options. Reviewers: Guozhang Wang <guozhang@confluent.io>, Ron Dagostino <rdagostino@confluent.io>, Ismael Juma <ismael@juma.me.uk
This commit is contained in:
parent
4c9e56b358
commit
befd80b38d
|
@ -25,6 +25,7 @@ from ducktape.services.service import Service
|
|||
from ducktape.utils.util import wait_until
|
||||
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
|
||||
|
||||
|
||||
class ConnectServiceBase(KafkaPathResolverMixin, Service):
|
||||
|
@ -280,6 +281,8 @@ class ConnectStandaloneService(ConnectServiceBase):
|
|||
heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
|
||||
self.logs["connect_heap_dump_file"]["path"]
|
||||
other_kafka_opts = self.security_config.kafka_opts.strip('\"')
|
||||
|
||||
cmd += fix_opts_for_new_jvm(node)
|
||||
cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
|
||||
for envvar in self.environment:
|
||||
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
|
||||
|
|
|
@ -23,6 +23,7 @@ from ducktape.utils.util import wait_until
|
|||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||
from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
|
||||
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
|
||||
|
||||
"""
|
||||
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
|
||||
|
@ -167,7 +168,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
|
|||
else:
|
||||
args['kafka_opts'] = self.security_config.kafka_opts
|
||||
|
||||
cmd = "export JMX_PORT=%(jmx_port)s; " \
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "export JMX_PORT=%(jmx_port)s; " \
|
||||
"export LOG_DIR=%(log_dir)s; " \
|
||||
"export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \
|
||||
"export KAFKA_OPTS=%(kafka_opts)s; " \
|
||||
|
|
|
@ -31,7 +31,8 @@ from kafkatest.services.monitor.jmx import JmxMixin
|
|||
from kafkatest.services.security.minikdc import MiniKdc
|
||||
from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
|
||||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2
|
||||
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
|
||||
|
||||
|
||||
class KafkaListener:
|
||||
|
@ -341,6 +342,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
|
||||
self.logs["kafka_heap_dump_file"]["path"]
|
||||
security_kafka_opts = self.security_config.kafka_opts.strip('\"')
|
||||
|
||||
cmd += fix_opts_for_new_jvm(node)
|
||||
cmd += "export KAFKA_OPTS=\"%s %s %s\"; " % (heap_kafka_opts, security_kafka_opts, self.extra_kafka_opts)
|
||||
cmd += "%s %s 1>> %s 2>> %s &" % \
|
||||
(self.path.script("kafka-server-start.sh", node),
|
||||
|
@ -455,7 +458,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
|
||||
use_zk_connection = topic_cfg.get('if-not-exists', False) or use_zk_to_create_topic
|
||||
|
||||
cmd = "%(kafka_topics_cmd)s %(connection_string)s --create --topic %(topic)s " % {
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%(kafka_topics_cmd)s %(connection_string)s --create --topic %(topic)s " % {
|
||||
'kafka_topics_cmd': self._kafka_topics_cmd(node, use_zk_connection),
|
||||
'connection_string': self._connect_setting(node, use_zk_connection),
|
||||
'topic': topic_cfg.get("topic"),
|
||||
|
@ -494,7 +498,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
self.logger.info("Deleting topic %s" % topic)
|
||||
kafka_topic_script = self.path.script("kafka-topics.sh", node)
|
||||
|
||||
cmd = kafka_topic_script + " "
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += kafka_topic_script + " "
|
||||
cmd += "--bootstrap-server %(bootstrap_servers)s --delete --topic %(topic)s " % {
|
||||
'bootstrap_servers': self.bootstrap_servers(self.security_protocol),
|
||||
'topic': topic
|
||||
|
@ -505,7 +510,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
def describe_topic(self, topic, node=None, use_zk_to_describe_topic=True):
|
||||
if node is None:
|
||||
node = self.nodes[0]
|
||||
cmd = "%s %s --topic %s --describe %s" % \
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%s %s --topic %s --describe %s" % \
|
||||
(self._kafka_topics_cmd(node=node, use_zk_connection=use_zk_to_describe_topic),
|
||||
self._connect_setting(node=node, use_zk_connection=use_zk_to_describe_topic),
|
||||
topic, self._kafka_topics_cmd_config(node=node, use_zk_connection=use_zk_to_describe_topic))
|
||||
|
@ -519,7 +525,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
def list_topics(self, node=None, use_zk_to_list_topic=True):
|
||||
if node is None:
|
||||
node = self.nodes[0]
|
||||
cmd = "%s %s --list %s" % (self._kafka_topics_cmd(node, use_zk_to_list_topic),
|
||||
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%s %s --list %s" % (self._kafka_topics_cmd(node, use_zk_to_list_topic),
|
||||
self._connect_setting(node, use_zk_to_list_topic),
|
||||
self._kafka_topics_cmd_config(node, use_zk_to_list_topic))
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
|
@ -530,7 +538,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
if node is None:
|
||||
node = self.nodes[0]
|
||||
self.logger.info("Altering message format version for topic %s with format %s", topic, msg_format_version)
|
||||
cmd = "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
|
||||
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
|
||||
(self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), self.zk.zkTlsConfigFileOption(), topic, msg_format_version)
|
||||
self.logger.info("Running alter message format command...\n%s" % cmd)
|
||||
node.account.ssh(cmd)
|
||||
|
@ -542,7 +552,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
self.logger.info("Enabling unclean leader election for topic %s", topic)
|
||||
else:
|
||||
self.logger.info("Disabling unclean leader election for topic %s", topic)
|
||||
cmd = "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s" % \
|
||||
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s" % \
|
||||
(self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), self.zk.zkTlsConfigFileOption(), topic, str(value).lower())
|
||||
self.logger.info("Running alter unclean leader command...\n%s" % cmd)
|
||||
node.account.ssh(cmd)
|
||||
|
@ -589,7 +601,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
json_str = json.dumps(json_str)
|
||||
|
||||
# create command
|
||||
cmd = "echo %s > %s && " % (json_str, json_file)
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "echo %s > %s && " % (json_str, json_file)
|
||||
cmd += "%s " % self.path.script("kafka-reassign-partitions.sh", node)
|
||||
cmd += "--zookeeper %s " % self.zk_connect_setting()
|
||||
cmd += "--reassignment-json-file %s " % json_file
|
||||
|
@ -628,7 +641,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
json_str = json.dumps(json_str)
|
||||
|
||||
# create command
|
||||
cmd = "echo %s > %s && " % (json_str, json_file)
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "echo %s > %s && " % (json_str, json_file)
|
||||
cmd += "%s " % self.path.script( "kafka-reassign-partitions.sh", node)
|
||||
cmd += "--zookeeper %s " % self.zk_connect_setting()
|
||||
cmd += "--reassignment-json-file %s " % json_file
|
||||
|
@ -663,7 +677,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
|
||||
# Check each data file to see if it contains the messages we want
|
||||
for log in files:
|
||||
cmd = "%s kafka.tools.DumpLogSegments --print-data-log --files %s | grep -E \"%s\"" % \
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%s kafka.tools.DumpLogSegments --print-data-log --files %s | grep -E \"%s\"" % \
|
||||
(self.path.script("kafka-run-class.sh", node), log.strip(), payload_match)
|
||||
|
||||
for line in node.account.ssh_capture(cmd, allow_fail=True):
|
||||
|
@ -779,7 +794,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
else:
|
||||
command_config = "--command-config " + command_config
|
||||
|
||||
cmd = "%s --bootstrap-server %s %s --list" % \
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%s --bootstrap-server %s %s --list" % \
|
||||
(consumer_group_script,
|
||||
self.bootstrap_servers(self.security_protocol),
|
||||
command_config)
|
||||
|
@ -803,7 +819,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
else:
|
||||
command_config = "--command-config " + command_config
|
||||
|
||||
cmd = "%s --bootstrap-server %s %s --group %s --describe" % \
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
|
||||
(consumer_group_script,
|
||||
self.bootstrap_servers(self.security_protocol),
|
||||
command_config, group)
|
||||
|
@ -877,7 +894,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time):
|
||||
node = self.nodes[0]
|
||||
|
||||
cmd = self.path.script("kafka-run-class.sh", node)
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += self.path.script("kafka-run-class.sh", node)
|
||||
cmd += " kafka.tools.GetOffsetShell"
|
||||
cmd += " --topic %s --broker-list %s --max-wait-ms %s --offsets %s --time %s" % (topic, self.bootstrap_servers(self.security_protocol), max_wait_ms, offsets, time)
|
||||
|
||||
|
|
|
@ -13,6 +13,46 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os.path
|
||||
|
||||
from collections import namedtuple
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0
|
||||
from kafkatest.directory_layout.kafka_path import create_path_resolver
|
||||
|
||||
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
|
||||
|
||||
new_jdk_not_supported = frozenset([str(LATEST_0_8_2), str(LATEST_0_9), str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0)])
|
||||
|
||||
def fix_opts_for_new_jvm(node):
|
||||
# Startup scripts for early versions of Kafka contains options
|
||||
# that not supported on latest versions of JVM like -XX:+PrintGCDateStamps or -XX:UseParNewGC.
|
||||
# When system test run on JVM that doesn't support these options
|
||||
# we should setup environment variables with correct options.
|
||||
java_ver = java_version(node)
|
||||
if java_ver <= 9:
|
||||
return ""
|
||||
|
||||
cmd = ""
|
||||
if node.version == LATEST_0_8_2 or node.version == LATEST_0_9 or node.version == LATEST_0_10_0 or node.version == LATEST_0_10_1 or node.version == LATEST_0_10_2 or node.version == LATEST_0_11_0 or node.version == LATEST_1_0:
|
||||
cmd += "export KAFKA_GC_LOG_OPTS=\"-Xlog:gc*:file=kafka-gc.log:time,tags:filecount=10,filesize=102400\"; "
|
||||
cmd += "export KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true\"; "
|
||||
return cmd
|
||||
|
||||
def java_version(node):
|
||||
# Determine java version on the node
|
||||
version = -1
|
||||
for line in node.account.ssh_capture("java -version"):
|
||||
if line.find("version") != -1:
|
||||
version = parse_version_str(line)
|
||||
return version
|
||||
|
||||
def parse_version_str(line):
|
||||
# Parse java version string. Examples:
|
||||
#`openjdk version "11.0.5" 2019-10-15` will return 11.
|
||||
#`java version "1.5.0"` will return 5.
|
||||
line = line[line.find('version \"') + 9:]
|
||||
dot_pos = line.find(".")
|
||||
if line[:dot_pos] == "1":
|
||||
return int(line[dot_pos+1:line.find(".", dot_pos+1)])
|
||||
else:
|
||||
return int(line[:dot_pos])
|
||||
|
|
|
@ -17,6 +17,7 @@ from ducktape.services.background_thread import BackgroundThreadService
|
|||
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
|
||||
|
||||
|
||||
class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
|
||||
|
@ -44,7 +45,8 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
|
|||
node.account.ssh(cmd)
|
||||
|
||||
def start_cmd(self, node):
|
||||
cmd = self.path.script("kafka-run-class.sh", node)
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += self.path.script("kafka-run-class.sh", node)
|
||||
cmd += " "
|
||||
cmd += self.java_class_name()
|
||||
cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_protocol))
|
||||
|
|
|
@ -21,6 +21,7 @@ import importlib
|
|||
import os
|
||||
import subprocess
|
||||
import signal
|
||||
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
|
||||
|
||||
|
||||
"""This module abstracts the implementation of a verifiable client, allowing
|
||||
|
@ -243,6 +244,7 @@ class VerifiableClientJava (VerifiableClientMixin):
|
|||
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
|
||||
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
|
||||
cmd += "export CLASSPATH; "
|
||||
cmd += fix_opts_for_new_jvm(node)
|
||||
cmd += self.parent.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools." + self.java_class_name
|
||||
return cmd
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ from kafkatest.services.kafka import TopicPartition
|
|||
from kafkatest.services.verifiable_client import VerifiableClientMixin
|
||||
from kafkatest.utils import is_int, is_int_with_prefix
|
||||
from kafkatest.version import DEV_BRANCH
|
||||
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
|
||||
|
||||
|
||||
class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
|
||||
|
@ -220,6 +221,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
|
|||
else:
|
||||
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
|
||||
|
||||
cmd += fix_opts_for_new_jvm(node)
|
||||
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
|
||||
cmd += self.impl.exec_cmd(node)
|
||||
cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol, True, self.offline_nodes))
|
||||
|
|
|
@ -24,6 +24,7 @@ from kafkatest.services.zookeeper import ZookeeperService
|
|||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.utils import is_int
|
||||
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion
|
||||
from kafkatest.services.kafka.util import java_version, new_jdk_not_supported
|
||||
|
||||
class TestUpgrade(ProduceConsumeValidateTest):
|
||||
|
||||
|
@ -126,10 +127,17 @@ class TestUpgrade(ProduceConsumeValidateTest):
|
|||
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
|
||||
version=KafkaVersion(from_kafka_version),
|
||||
topics={self.topic: {"partitions": self.partitions,
|
||||
"replication-factor": self.replication_factor,
|
||||
'configs': {"min.insync.replicas": 2}}})
|
||||
"replication-factor": self.replication_factor,
|
||||
'configs': {"min.insync.replicas": 2}}})
|
||||
self.kafka.security_protocol = security_protocol
|
||||
self.kafka.interbroker_security_protocol = security_protocol
|
||||
|
||||
jdk_version = java_version(self.kafka.nodes[0])
|
||||
|
||||
if jdk_version > 9 and from_kafka_version in new_jdk_not_supported:
|
||||
self.logger.info("Test ignored! Kafka " + from_kafka_version + " not support jdk " + str(jdk_version))
|
||||
return
|
||||
|
||||
self.kafka.start()
|
||||
|
||||
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
|
||||
|
|
Loading…
Reference in New Issue