KAFKA-9573: Fix JVM options to run early versions of Kafka on the latest JVMs (#8138)

Startup scripts for the early version of Kafka contain removed JVM options like `-XX:+PrintGCDateStamps` or `-XX:UseParNewGC`. 
When system tests run on JVM that doesn't support these options we should set up
environment variables with correct options.

Reviewers: Guozhang Wang <guozhang@confluent.io>, Ron Dagostino <rdagostino@confluent.io>, Ismael Juma <ismael@juma.me.uk
This commit is contained in:
Nikolay 2020-03-25 20:31:07 +03:00 committed by GitHub
parent 4c9e56b358
commit befd80b38d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 94 additions and 17 deletions

View File

@ -25,6 +25,7 @@ from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
class ConnectServiceBase(KafkaPathResolverMixin, Service):
@ -280,6 +281,8 @@ class ConnectStandaloneService(ConnectServiceBase):
heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
self.logs["connect_heap_dump_file"]["path"]
other_kafka_opts = self.security_config.kafka_opts.strip('\"')
cmd += fix_opts_for_new_jvm(node)
cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
for envvar in self.environment:
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))

View File

@ -23,6 +23,7 @@ from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_9_0_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
"""
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
@ -167,7 +168,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
else:
args['kafka_opts'] = self.security_config.kafka_opts
cmd = "export JMX_PORT=%(jmx_port)s; " \
cmd = fix_opts_for_new_jvm(node)
cmd += "export JMX_PORT=%(jmx_port)s; " \
"export LOG_DIR=%(log_dir)s; " \
"export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \
"export KAFKA_OPTS=%(kafka_opts)s; " \

View File

@ -31,7 +31,8 @@ from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.security.minikdc import MiniKdc
from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
class KafkaListener:
@ -341,6 +342,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
self.logs["kafka_heap_dump_file"]["path"]
security_kafka_opts = self.security_config.kafka_opts.strip('\"')
cmd += fix_opts_for_new_jvm(node)
cmd += "export KAFKA_OPTS=\"%s %s %s\"; " % (heap_kafka_opts, security_kafka_opts, self.extra_kafka_opts)
cmd += "%s %s 1>> %s 2>> %s &" % \
(self.path.script("kafka-server-start.sh", node),
@ -455,7 +458,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
use_zk_connection = topic_cfg.get('if-not-exists', False) or use_zk_to_create_topic
cmd = "%(kafka_topics_cmd)s %(connection_string)s --create --topic %(topic)s " % {
cmd = fix_opts_for_new_jvm(node)
cmd += "%(kafka_topics_cmd)s %(connection_string)s --create --topic %(topic)s " % {
'kafka_topics_cmd': self._kafka_topics_cmd(node, use_zk_connection),
'connection_string': self._connect_setting(node, use_zk_connection),
'topic': topic_cfg.get("topic"),
@ -494,7 +498,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.info("Deleting topic %s" % topic)
kafka_topic_script = self.path.script("kafka-topics.sh", node)
cmd = kafka_topic_script + " "
cmd = fix_opts_for_new_jvm(node)
cmd += kafka_topic_script + " "
cmd += "--bootstrap-server %(bootstrap_servers)s --delete --topic %(topic)s " % {
'bootstrap_servers': self.bootstrap_servers(self.security_protocol),
'topic': topic
@ -505,7 +510,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def describe_topic(self, topic, node=None, use_zk_to_describe_topic=True):
if node is None:
node = self.nodes[0]
cmd = "%s %s --topic %s --describe %s" % \
cmd = fix_opts_for_new_jvm(node)
cmd += "%s %s --topic %s --describe %s" % \
(self._kafka_topics_cmd(node=node, use_zk_connection=use_zk_to_describe_topic),
self._connect_setting(node=node, use_zk_connection=use_zk_to_describe_topic),
topic, self._kafka_topics_cmd_config(node=node, use_zk_connection=use_zk_to_describe_topic))
@ -519,7 +525,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def list_topics(self, node=None, use_zk_to_list_topic=True):
if node is None:
node = self.nodes[0]
cmd = "%s %s --list %s" % (self._kafka_topics_cmd(node, use_zk_to_list_topic),
cmd = fix_opts_for_new_jvm(node)
cmd += "%s %s --list %s" % (self._kafka_topics_cmd(node, use_zk_to_list_topic),
self._connect_setting(node, use_zk_to_list_topic),
self._kafka_topics_cmd_config(node, use_zk_to_list_topic))
for line in node.account.ssh_capture(cmd):
@ -530,7 +538,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
if node is None:
node = self.nodes[0]
self.logger.info("Altering message format version for topic %s with format %s", topic, msg_format_version)
cmd = "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
cmd = fix_opts_for_new_jvm(node)
cmd += "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
(self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), self.zk.zkTlsConfigFileOption(), topic, msg_format_version)
self.logger.info("Running alter message format command...\n%s" % cmd)
node.account.ssh(cmd)
@ -542,7 +552,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.info("Enabling unclean leader election for topic %s", topic)
else:
self.logger.info("Disabling unclean leader election for topic %s", topic)
cmd = "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s" % \
cmd = fix_opts_for_new_jvm(node)
cmd += "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s" % \
(self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), self.zk.zkTlsConfigFileOption(), topic, str(value).lower())
self.logger.info("Running alter unclean leader command...\n%s" % cmd)
node.account.ssh(cmd)
@ -589,7 +601,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
json_str = json.dumps(json_str)
# create command
cmd = "echo %s > %s && " % (json_str, json_file)
cmd = fix_opts_for_new_jvm(node)
cmd += "echo %s > %s && " % (json_str, json_file)
cmd += "%s " % self.path.script("kafka-reassign-partitions.sh", node)
cmd += "--zookeeper %s " % self.zk_connect_setting()
cmd += "--reassignment-json-file %s " % json_file
@ -628,7 +641,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
json_str = json.dumps(json_str)
# create command
cmd = "echo %s > %s && " % (json_str, json_file)
cmd = fix_opts_for_new_jvm(node)
cmd += "echo %s > %s && " % (json_str, json_file)
cmd += "%s " % self.path.script( "kafka-reassign-partitions.sh", node)
cmd += "--zookeeper %s " % self.zk_connect_setting()
cmd += "--reassignment-json-file %s " % json_file
@ -663,7 +677,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
# Check each data file to see if it contains the messages we want
for log in files:
cmd = "%s kafka.tools.DumpLogSegments --print-data-log --files %s | grep -E \"%s\"" % \
cmd = fix_opts_for_new_jvm(node)
cmd += "%s kafka.tools.DumpLogSegments --print-data-log --files %s | grep -E \"%s\"" % \
(self.path.script("kafka-run-class.sh", node), log.strip(), payload_match)
for line in node.account.ssh_capture(cmd, allow_fail=True):
@ -779,7 +794,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
else:
command_config = "--command-config " + command_config
cmd = "%s --bootstrap-server %s %s --list" % \
cmd = fix_opts_for_new_jvm(node)
cmd += "%s --bootstrap-server %s %s --list" % \
(consumer_group_script,
self.bootstrap_servers(self.security_protocol),
command_config)
@ -803,7 +819,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
else:
command_config = "--command-config " + command_config
cmd = "%s --bootstrap-server %s %s --group %s --describe" % \
cmd = fix_opts_for_new_jvm(node)
cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
(consumer_group_script,
self.bootstrap_servers(self.security_protocol),
command_config, group)
@ -877,7 +894,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time):
node = self.nodes[0]
cmd = self.path.script("kafka-run-class.sh", node)
cmd = fix_opts_for_new_jvm(node)
cmd += self.path.script("kafka-run-class.sh", node)
cmd += " kafka.tools.GetOffsetShell"
cmd += " --topic %s --broker-list %s --max-wait-ms %s --offsets %s --time %s" % (topic, self.bootstrap_servers(self.security_protocol), max_wait_ms, offsets, time)

View File

@ -13,6 +13,46 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os.path
from collections import namedtuple
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0
from kafkatest.directory_layout.kafka_path import create_path_resolver
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
new_jdk_not_supported = frozenset([str(LATEST_0_8_2), str(LATEST_0_9), str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0)])
def fix_opts_for_new_jvm(node):
# Startup scripts for early versions of Kafka contains options
# that not supported on latest versions of JVM like -XX:+PrintGCDateStamps or -XX:UseParNewGC.
# When system test run on JVM that doesn't support these options
# we should setup environment variables with correct options.
java_ver = java_version(node)
if java_ver <= 9:
return ""
cmd = ""
if node.version == LATEST_0_8_2 or node.version == LATEST_0_9 or node.version == LATEST_0_10_0 or node.version == LATEST_0_10_1 or node.version == LATEST_0_10_2 or node.version == LATEST_0_11_0 or node.version == LATEST_1_0:
cmd += "export KAFKA_GC_LOG_OPTS=\"-Xlog:gc*:file=kafka-gc.log:time,tags:filecount=10,filesize=102400\"; "
cmd += "export KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true\"; "
return cmd
def java_version(node):
# Determine java version on the node
version = -1
for line in node.account.ssh_capture("java -version"):
if line.find("version") != -1:
version = parse_version_str(line)
return version
def parse_version_str(line):
# Parse java version string. Examples:
#`openjdk version "11.0.5" 2019-10-15` will return 11.
#`java version "1.5.0"` will return 5.
line = line[line.find('version \"') + 9:]
dot_pos = line.find(".")
if line[:dot_pos] == "1":
return int(line[dot_pos+1:line.find(".", dot_pos+1)])
else:
return int(line[:dot_pos])

View File

@ -17,6 +17,7 @@ from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
@ -44,7 +45,8 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
node.account.ssh(cmd)
def start_cmd(self, node):
cmd = self.path.script("kafka-run-class.sh", node)
cmd = fix_opts_for_new_jvm(node)
cmd += self.path.script("kafka-run-class.sh", node)
cmd += " "
cmd += self.java_class_name()
cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_protocol))

View File

@ -21,6 +21,7 @@ import importlib
import os
import subprocess
import signal
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
"""This module abstracts the implementation of a verifiable client, allowing
@ -243,6 +244,7 @@ class VerifiableClientJava (VerifiableClientMixin):
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
cmd += "export CLASSPATH; "
cmd += fix_opts_for_new_jvm(node)
cmd += self.parent.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools." + self.java_class_name
return cmd

View File

@ -24,6 +24,7 @@ from kafkatest.services.kafka import TopicPartition
from kafkatest.services.verifiable_client import VerifiableClientMixin
from kafkatest.utils import is_int, is_int_with_prefix
from kafkatest.version import DEV_BRANCH
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
@ -220,6 +221,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
else:
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += fix_opts_for_new_jvm(node)
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
cmd += self.impl.exec_cmd(node)
cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol, True, self.offline_nodes))

View File

@ -24,6 +24,7 @@ from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion
from kafkatest.services.kafka.util import java_version, new_jdk_not_supported
class TestUpgrade(ProduceConsumeValidateTest):
@ -126,10 +127,17 @@ class TestUpgrade(ProduceConsumeValidateTest):
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
version=KafkaVersion(from_kafka_version),
topics={self.topic: {"partitions": self.partitions,
"replication-factor": self.replication_factor,
'configs': {"min.insync.replicas": 2}}})
"replication-factor": self.replication_factor,
'configs': {"min.insync.replicas": 2}}})
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
jdk_version = java_version(self.kafka.nodes[0])
if jdk_version > 9 and from_kafka_version in new_jdk_not_supported:
self.logger.info("Test ignored! Kafka " + from_kafka_version + " not support jdk " + str(jdk_version))
return
self.kafka.start()
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,