KAFKA-7834: Extend collected logs in system test services to include heap dumps

* Enable heap dumps on OOM with -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=<file.bin> in the major services in system tests
* Collect the heap dump from the predefined location as part of the result logs for each service
* Change Connect service to delete the whole root directory instead of individual expected files
* Tested by running the full suite of system tests

Author: Konstantine Karantasis <konstantine@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #6158 from kkonstantine/KAFKA-7834

(cherry picked from commit 83c435f3ba)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
This commit is contained in:
Konstantine Karantasis 2019-02-04 16:46:03 -08:00 committed by Ewen Cheslack-Postava
parent 47bb1a4668
commit d96c7eae0b
3 changed files with 39 additions and 8 deletions

View File

@ -42,6 +42,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid")
EXTERNAL_CONFIGS_FILE = os.path.join(PERSISTENT_ROOT, "connect-external-configs.properties")
CONNECT_REST_PORT = 8083
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "connect_heap_dump.bin")
# Currently the Connect worker supports waiting on three modes:
STARTUP_MODE_INSTANT = 'INSTANT'
@ -61,6 +62,9 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
"connect_stderr": {
"path": STDERR_FILE,
"collect_default": True},
"connect_heap_dump_file": {
"path": HEAP_DUMP_FILE,
"collect_default": True}
}
def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 60):
@ -160,8 +164,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
def clean_node(self, node):
node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
self.security_config.clean_node(node)
all_files = " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE, self.EXTERNAL_CONFIGS_FILE] + self.config_filenames() + self.files)
node.account.ssh("rm -rf " + all_files, allow_fail=False)
other_files = " ".join(self.config_filenames() + self.files)
node.account.ssh("rm -rf -- %s %s" % (ConnectServiceBase.PERSISTENT_ROOT, other_files), allow_fail=False)
def config_filenames(self):
return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])]
@ -252,6 +256,14 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
def _base_url(self, node):
return 'http://' + node.account.externally_routable_ip + ':' + str(self.CONNECT_REST_PORT)
def append_to_environment_variable(self, envvar, value):
env_opts = self.environment[envvar]
if env_opts is None:
env_opts = "\"%s\"" % value
else:
env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
self.environment[envvar] = env_opts
class ConnectStandaloneService(ConnectServiceBase):
"""Runs Kafka Connect in standalone mode."""
@ -266,7 +278,10 @@ class ConnectStandaloneService(ConnectServiceBase):
def start_cmd(self, node, connector_configs):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
self.logs["connect_heap_dump_file"]["path"]
other_kafka_opts = self.security_config.kafka_opts.strip('\"')
cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
for envvar in self.environment:
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
cmd += "%s %s " % (self.path.script("connect-standalone.sh", node), self.CONFIG_FILE)
@ -314,7 +329,10 @@ class ConnectDistributedService(ConnectServiceBase):
# connector_configs argument is intentionally ignored in distributed service.
def start_cmd(self, node, connector_configs):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
self.logs["connect_heap_dump_file"]["path"]
other_kafka_opts = self.security_config.kafka_opts.strip('\"')
cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
for envvar in self.environment:
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), self.CONFIG_FILE)

View File

@ -49,6 +49,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
# Kafka Authorizer
SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
logs = {
"kafka_server_start_stdout_stderr": {
@ -65,7 +66,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
"collect_default": False},
"kafka_data_2": {
"path": DATA_LOG_DIR_2,
"collect_default": False}
"collect_default": False},
"kafka_heap_dump_file": {
"path": HEAP_DUMP_FILE,
"collect_default": True}
}
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
@ -229,7 +233,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def start_cmd(self, node):
cmd = "export JMX_PORT=%d; " % self.jmx_port
cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG
cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % \
self.logs["kafka_heap_dump_file"]["path"]
other_kafka_opts = self.security_config.kafka_opts.strip('\"')
cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, other_kafka_opts)
cmd += "%s %s 1>> %s 2>> %s &" % \
(self.path.script("kafka-server-start.sh", node),
KafkaService.CONFIG_FILE,

View File

@ -30,6 +30,7 @@ from kafkatest.version import DEV_BRANCH
class ZookeeperService(KafkaPathResolverMixin, Service):
ROOT = "/mnt/zookeeper"
DATA = os.path.join(ROOT, "data")
HEAP_DUMP_FILE = os.path.join(ROOT, "zk_heap_dump.bin")
logs = {
"zk_log": {
@ -37,7 +38,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
"collect_default": True},
"zk_data": {
"path": DATA,
"collect_default": False}
"collect_default": False},
"zk_heap_dump_file": {
"path": HEAP_DUMP_FILE,
"collect_default": True}
}
def __init__(self, context, num_nodes, zk_sasl = False):
@ -76,8 +80,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
self.logger.info(config_file)
node.account.create_file("%s/zookeeper.properties" % ZookeeperService.ROOT, config_file)
start_cmd = "export KAFKA_OPTS=\"%s\";" % (self.kafka_opts + ' ' + self.security_system_properties) \
heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s" % self.logs["zk_heap_dump_file"]["path"]
other_kafka_opts = self.kafka_opts + ' ' + self.security_system_properties \
if self.security_config.zk_sasl else self.kafka_opts
start_cmd = "export KAFKA_OPTS=\"%s %s\";" % (heap_kafka_opts, other_kafka_opts)
start_cmd += "%s " % self.path.script("zookeeper-server-start.sh", node)
start_cmd += "%s/zookeeper.properties &>> %s &" % (ZookeeperService.ROOT, self.logs["zk_log"]["path"])
node.account.ssh(start_cmd)