kafka/tests/kafkatest/services/streams.py

778 lines
33 KiB
Python
Raw Normal View History

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os.path
import signal
from . import streams_property
from . import consumer_property
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import KafkaConfig
from kafkatest.services.monitor.jmx import JmxMixin
from .kafka.util import get_log4j_config_param, get_log4j_config_for_tools
STATE_DIR = "state.dir"
class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
"""Base class for Streams Test services providing some common settings and functionality"""
PERSISTENT_ROOT = "/mnt/streams"
# The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "streams.properties")
LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log")
STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout")
STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr")
JMX_LOG_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.log")
JMX_ERR_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.err.log")
PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
CLEAN_NODE_ENABLED = True
logs = {
"streams_config": {
"path": CONFIG_FILE,
"collect_default": True},
"streams_config.1": {
"path": CONFIG_FILE + ".1",
"collect_default": True},
"streams_config.0-1": {
"path": CONFIG_FILE + ".0-1",
"collect_default": True},
"streams_config.1-1": {
"path": CONFIG_FILE + ".1-1",
"collect_default": True},
"streams_log": {
"path": LOG_FILE,
"collect_default": True},
"streams_stdout": {
"path": STDOUT_FILE,
"collect_default": True},
"streams_stderr": {
"path": STDERR_FILE,
"collect_default": True},
"streams_log.1": {
"path": LOG_FILE + ".1",
"collect_default": True},
"streams_stdout.1": {
"path": STDOUT_FILE + ".1",
"collect_default": True},
"streams_stderr.1": {
"path": STDERR_FILE + ".1",
"collect_default": True},
"streams_log.2": {
"path": LOG_FILE + ".2",
"collect_default": True},
"streams_stdout.2": {
"path": STDOUT_FILE + ".2",
"collect_default": True},
"streams_stderr.2": {
"path": STDERR_FILE + ".2",
"collect_default": True},
"streams_log.3": {
"path": LOG_FILE + ".3",
"collect_default": True},
"streams_stdout.3": {
"path": STDOUT_FILE + ".3",
"collect_default": True},
"streams_stderr.3": {
"path": STDERR_FILE + ".3",
"collect_default": True},
"streams_log.0-1": {
"path": LOG_FILE + ".0-1",
"collect_default": True},
"streams_stdout.0-1": {
"path": STDOUT_FILE + ".0-1",
"collect_default": True},
"streams_stderr.0-1": {
"path": STDERR_FILE + ".0-1",
"collect_default": True},
"streams_log.0-2": {
"path": LOG_FILE + ".0-2",
"collect_default": True},
"streams_stdout.0-2": {
"path": STDOUT_FILE + ".0-2",
"collect_default": True},
"streams_stderr.0-2": {
"path": STDERR_FILE + ".0-2",
"collect_default": True},
"streams_log.0-3": {
"path": LOG_FILE + ".0-3",
"collect_default": True},
"streams_stdout.0-3": {
"path": STDOUT_FILE + ".0-3",
"collect_default": True},
"streams_stderr.0-3": {
"path": STDERR_FILE + ".0-3",
"collect_default": True},
"streams_log.0-4": {
"path": LOG_FILE + ".0-4",
"collect_default": True},
"streams_stdout.0-4": {
"path": STDOUT_FILE + ".0-4",
"collect_default": True},
"streams_stderr.0-4": {
"path": STDERR_FILE + ".0-4",
"collect_default": True},
"streams_log.0-5": {
"path": LOG_FILE + ".0-5",
"collect_default": True},
"streams_stdout.0-5": {
"path": STDOUT_FILE + ".0-5",
"collect_default": True},
"streams_stderr.0-5": {
"path": STDERR_FILE + ".0-5",
"collect_default": True},
"streams_log.0-6": {
"path": LOG_FILE + ".0-6",
"collect_default": True},
"streams_stdout.0-6": {
"path": STDOUT_FILE + ".0-6",
"collect_default": True},
"streams_stderr.0-6": {
"path": STDERR_FILE + ".0-6",
"collect_default": True},
"streams_log.1-1": {
"path": LOG_FILE + ".1-1",
"collect_default": True},
"streams_stdout.1-1": {
"path": STDOUT_FILE + ".1-1",
"collect_default": True},
"streams_stderr.1-1": {
"path": STDERR_FILE + ".1-1",
"collect_default": True},
"streams_log.1-2": {
"path": LOG_FILE + ".1-2",
"collect_default": True},
"streams_stdout.1-2": {
"path": STDOUT_FILE + ".1-2",
"collect_default": True},
"streams_stderr.1-2": {
"path": STDERR_FILE + ".1-2",
"collect_default": True},
"streams_log.1-3": {
"path": LOG_FILE + ".1-3",
"collect_default": True},
"streams_stdout.1-3": {
"path": STDOUT_FILE + ".1-3",
"collect_default": True},
"streams_stderr.1-3": {
"path": STDERR_FILE + ".1-3",
"collect_default": True},
"streams_log.1-4": {
"path": LOG_FILE + ".1-4",
"collect_default": True},
"streams_stdout.1-4": {
"path": STDOUT_FILE + ".1-4",
"collect_default": True},
"streams_stderr.1-4": {
"path": STDERR_FILE + ".1-4",
"collect_default": True},
"streams_log.1-5": {
"path": LOG_FILE + ".1-5",
"collect_default": True},
"streams_stdout.1-5": {
"path": STDOUT_FILE + ".1-5",
"collect_default": True},
"streams_stderr.1-5": {
"path": STDERR_FILE + ".1-5",
"collect_default": True},
"streams_log.1-6": {
"path": LOG_FILE + ".1-6",
"collect_default": True},
"streams_stdout.1-6": {
"path": STDOUT_FILE + ".1-6",
"collect_default": True},
"streams_stderr.1-6": {
"path": STDERR_FILE + ".1-6",
"collect_default": True},
"jmx_log": {
"path": JMX_LOG_FILE,
"collect_default": True},
"jmx_err": {
"path": JMX_ERR_FILE,
"collect_default": True},
}
def __init__(self, test_context, kafka, streams_class_name, user_test_args1, user_test_args2=None, user_test_args3=None, user_test_args4=None):
Service.__init__(self, test_context, num_nodes=1)
self.kafka = kafka
self.args = {'streams_class_name': streams_class_name,
'user_test_args1': user_test_args1,
'user_test_args2': user_test_args2,
'user_test_args3': user_test_args3,
'user_test_args4': user_test_args4}
self.log_level = "DEBUG"
@property
def node(self):
return self.nodes[0]
@property
def expectedMessage(self):
return 'StreamsTest instance started'
def pids(self, node):
try:
pids = [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=str)]
return [int(pid) for pid in pids]
except Exception as exception:
self.logger.debug(str(exception))
return []
def stop_nodes(self, clean_shutdown=True):
for node in self.nodes:
self.stop_node(node, clean_shutdown)
def stop_node(self, node, clean_shutdown=True):
self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams Test on " + str(node.account))
pids = self.pids(node)
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
for pid in pids:
node.account.signal(pid, sig, allow_fail=True)
if clean_shutdown:
for pid in pids:
wait_until(lambda: not node.account.alive(pid), timeout_sec=120, err_msg="Streams Test process on " + str(node.account) + " took too long to exit")
node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
def restart(self):
# We don't want to do any clean up here, just restart the process.
for node in self.nodes:
self.logger.info("Restarting Kafka Streams on " + str(node.account))
self.stop_node(node)
self.start_node(node)
def abortThenRestart(self):
# We don't want to do any clean up here, just abort then restart the process. The running service is killed immediately.
for node in self.nodes:
self.logger.info("Aborting Kafka Streams on " + str(node.account))
self.stop_node(node, False)
self.logger.info("Restarting Kafka Streams on " + str(node.account))
self.start_node(node)
def wait(self, timeout_sec=1440):
for node in self.nodes:
self.wait_node(node, timeout_sec)
def wait_node(self, node, timeout_sec=None):
for pid in self.pids(node):
wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, err_msg="Streams Test process on " + str(node.account) + " took too long to exit")
def clean_node(self, node):
node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
if self.CLEAN_NODE_ENABLED:
node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
def start_cmd(self, node):
args = self.args.copy()
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j_param'] = get_log4j_config_param(node)
args['log4j'] = get_log4j_config_for_tools(node)
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "( export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \
" %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
self.logger.info("Executing streams cmd: " + cmd)
return cmd
def prop_file(self):
cfg = KafkaConfig(**{streams_property.STATE_DIR: self.PERSISTENT_ROOT, streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()})
return cfg.render()
def start_node(self, node):
node.account.mkdirs(self.PERSISTENT_ROOT)
prop_file = self.prop_file()
node.account.create_file(self.CONFIG_FILE, prop_file)
node.account.create_file(get_log4j_config_for_tools(node), self.render(get_log4j_config_for_tools(node), log_file=self.LOG_FILE))
self.logger.info("Starting StreamsTest process on " + str(node.account))
with node.account.monitor_log(self.STDOUT_FILE) as monitor:
node.account.ssh(self.start_cmd(node))
monitor.wait_until(self.expectedMessage, timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
if not self.pids(node):
raise RuntimeError("No process ids recorded")
class StreamsSmokeTestBaseService(StreamsTestBaseService):
"""Base class for Streams Smoke Test services providing some common settings and functionality"""
def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3):
super(StreamsSmokeTestBaseService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsSmokeTest",
command)
KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe() (#7441) Inside onLeavePrepare we would look into the assignment and try to revoke the owned tasks and notify users via RebalanceListener#onPartitionsRevoked, and then clear the assignment. However, the subscription's assignment is already cleared in this.subscriptions.unsubscribe(); which means user's rebalance listener would never be triggered. In other words, from consumer client's pov nothing is owned after unsubscribe, but from the user caller's pov the partitions are not revoked yet. For callers like Kafka Streams which rely on the rebalance listener to maintain their internal state, this leads to inconsistent state management and failure cases. Before KIP-429 this issue is hidden away since every time the consumer re-joins the group later, it would still revoke everything anyways regardless of the passed-in parameters of the rebalance listener; with KIP-429 this is easier to reproduce now. Our fixes are following: • Inside unsubscribe, first do onLeavePrepare / maybeLeaveGroup and then subscription.unsubscribe. This we we are guaranteed that the streams' tasks are all closed as revoked by then. • [Optimization] If the generation is reset due to fatal error from join / hb response etc, then we know that all partitions are lost, and we should not trigger onPartitionRevoked, but instead just onPartitionsLost inside onLeavePrepare. This is because we don't want to commit for lost tracks during rebalance which is doomed to fail as we don't have any generation info. Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
2019-10-30 01:41:25 +08:00
self.NUM_THREADS = num_threads
self.PROCESSING_GUARANTEE = processing_guarantee
self.KAFKA_STREAMS_VERSION = ""
self.UPGRADE_FROM = None
self.REPLICATION_FACTOR = replication_factor
def set_version(self, kafka_streams_version):
self.KAFKA_STREAMS_VERSION = kafka_streams_version
def set_upgrade_from(self, upgrade_from):
self.UPGRADE_FROM = upgrade_from
KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe() (#7441) Inside onLeavePrepare we would look into the assignment and try to revoke the owned tasks and notify users via RebalanceListener#onPartitionsRevoked, and then clear the assignment. However, the subscription's assignment is already cleared in this.subscriptions.unsubscribe(); which means user's rebalance listener would never be triggered. In other words, from consumer client's pov nothing is owned after unsubscribe, but from the user caller's pov the partitions are not revoked yet. For callers like Kafka Streams which rely on the rebalance listener to maintain their internal state, this leads to inconsistent state management and failure cases. Before KIP-429 this issue is hidden away since every time the consumer re-joins the group later, it would still revoke everything anyways regardless of the passed-in parameters of the rebalance listener; with KIP-429 this is easier to reproduce now. Our fixes are following: • Inside unsubscribe, first do onLeavePrepare / maybeLeaveGroup and then subscription.unsubscribe. This we we are guaranteed that the streams' tasks are all closed as revoked by then. • [Optimization] If the generation is reset due to fatal error from join / hb response etc, then we know that all partitions are lost, and we should not trigger onPartitionRevoked, but instead just onPartitionsLost inside onLeavePrepare. This is because we don't want to commit for lost tracks during rebalance which is doomed to fail as we don't have any generation info. Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
2019-10-30 01:41:25 +08:00
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE,
streams_property.NUM_THREADS: self.NUM_THREADS,
"replication.factor": self.REPLICATION_FACTOR,
"num.standby.replicas": 2,
"buffered.records.per.partition": 100,
"commit.interval.ms": 1000,
"auto.offset.reset": "earliest",
"acks": "all",
"acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment
"session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735
}
if self.UPGRADE_FROM is not None:
properties['upgrade.from'] = self.UPGRADE_FROM
KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe() (#7441) Inside onLeavePrepare we would look into the assignment and try to revoke the owned tasks and notify users via RebalanceListener#onPartitionsRevoked, and then clear the assignment. However, the subscription's assignment is already cleared in this.subscriptions.unsubscribe(); which means user's rebalance listener would never be triggered. In other words, from consumer client's pov nothing is owned after unsubscribe, but from the user caller's pov the partitions are not revoked yet. For callers like Kafka Streams which rely on the rebalance listener to maintain their internal state, this leads to inconsistent state management and failure cases. Before KIP-429 this issue is hidden away since every time the consumer re-joins the group later, it would still revoke everything anyways regardless of the passed-in parameters of the rebalance listener; with KIP-429 this is easier to reproduce now. Our fixes are following: • Inside unsubscribe, first do onLeavePrepare / maybeLeaveGroup and then subscription.unsubscribe. This we we are guaranteed that the streams' tasks are all closed as revoked by then. • [Optimization] If the generation is reset due to fatal error from join / hb response etc, then we know that all partitions are lost, and we should not trigger onPartitionRevoked, but instead just onPartitionsLost inside onLeavePrepare. This is because we don't want to commit for lost tracks during rebalance which is doomed to fail as we don't have any generation info. Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
2019-10-30 01:41:25 +08:00
cfg = KafkaConfig(**properties)
return cfg.render()
def start_cmd(self, node):
args = self.args.copy()
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j_param'] = get_log4j_config_param(node)
args['log4j'] = get_log4j_config_for_tools(node)
args['version'] = self.KAFKA_STREAMS_VERSION
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "( export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j)s\";" \
" INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s" \
" %(kafka_run_class)s %(streams_class_name)s" \
" %(config_file)s %(user_test_args1)s" \
" & echo $! >&3 ) " \
"1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
self.logger.info("Executing streams cmd: " + cmd)
return cmd
class StreamsEosTestBaseService(StreamsTestBaseService):
"""Base class for Streams EOS Test services providing some common settings and functionality"""
clean_node_enabled = True
def __init__(self, test_context, kafka, command):
super(StreamsEosTestBaseService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsEosTest",
command)
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.PROCESSING_GUARANTEE: "exactly_once_v2",
"acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment
"session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735
}
cfg = KafkaConfig(**properties)
return cfg.render()
def clean_node(self, node):
if self.clean_node_enabled:
super(StreamsEosTestBaseService, self).clean_node(node)
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run")
self.DISABLE_AUTO_TERMINATE = ""
def disable_auto_terminate(self):
self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate"
def start_cmd(self, node):
args = self.args.copy()
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j_param'] = get_log4j_config_param(node)
args['log4j'] = get_log4j_config_for_tools(node)
args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "( export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(config_file)s %(user_test_args1)s %(disable_auto_terminate)s" \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
self.logger.info("Executing streams cmd: " + cmd)
return cmd
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka, processing_guarantee, num_threads = 3, replication_factor = 3):
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", processing_guarantee, num_threads, replication_factor)
class StreamsEosTestDriverService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run")
class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex")
class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex")
class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestShutdownDeadlockService, self).__init__(test_context, kafka, "close-deadlock-test")
class StreamsBrokerCompatibilityService(StreamsTestBaseService):
def __init__(self, test_context, kafka, processingMode):
super(StreamsBrokerCompatibilityService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
processingMode)
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
# the old broker (< 2.4) does not support configuration replication.factor=-1
"replication.factor": 1,
"acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment
"session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735
}
cfg = KafkaConfig(**properties)
return cfg.render()
class StreamsBrokerDownResilienceService(StreamsTestBaseService):
def __init__(self, test_context, kafka, configs):
super(StreamsBrokerDownResilienceService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsBrokerDownResilienceTest",
configs)
def start_cmd(self, node):
args = self.args.copy()
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j_param'] = get_log4j_config_param(node)
args['log4j'] = get_log4j_config_for_tools(node)
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "( export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \
" %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
self.logger.info("Executing: " + cmd)
return cmd
class StreamsStandbyTaskService(StreamsTestBaseService):
def __init__(self, test_context, kafka, configs):
super(StreamsStandbyTaskService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsStandByReplicaTest",
configs)
class StreamsResetter(StreamsTestBaseService):
def __init__(self, test_context, kafka, topic, applicationId):
super(StreamsResetter, self).__init__(test_context,
kafka,
"org.apache.kafka.tools.StreamsResetter",
"")
self.topic = topic
self.applicationId = applicationId
@property
def expectedMessage(self):
return 'Done.'
def start_cmd(self, node):
args = self.args.copy()
args['bootstrap.servers'] = self.kafka.bootstrap_servers()
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j_param'] = get_log4j_config_param(node)
args['log4j'] = get_log4j_config_for_tools(node)
args['application.id'] = self.applicationId
args['input.topics'] = self.topic
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "(export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j)s\";" \
"%(kafka_run_class)s %(streams_class_name)s " \
"--bootstrap-server %(bootstrap.servers)s " \
"--force " \
"--application-id %(application.id)s " \
"--input-topics %(input.topics)s " \
"& echo $! >&3 ) " \
"1>> %(stdout)s " \
"2>> %(stderr)s " \
"3> %(pidfile)s "% args
self.logger.info("Executing: " + cmd)
return cmd
class StreamsOptimizedUpgradeTestService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsOptimizedUpgradeTestService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsOptimizedTest",
"")
self.OPTIMIZED_CONFIG = 'none'
self.INPUT_TOPIC = None
self.AGGREGATION_TOPIC = None
self.REDUCE_TOPIC = None
self.JOIN_TOPIC = None
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
'topology.optimization': self.OPTIMIZED_CONFIG,
'input.topic': self.INPUT_TOPIC,
'aggregation.topic': self.AGGREGATION_TOPIC,
'reduce.topic': self.REDUCE_TOPIC,
'join.topic': self.JOIN_TOPIC,
"acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment
"session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735
}
cfg = KafkaConfig(**properties)
return cfg.render()
class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsUpgradeTest",
"")
self.UPGRADE_FROM = None
self.UPGRADE_TO = None
self.extra_properties = {}
def set_config(self, key, value):
self.extra_properties[key] = value
def set_version(self, kafka_streams_version):
self.KAFKA_STREAMS_VERSION = kafka_streams_version
def set_upgrade_from(self, upgrade_from):
self.UPGRADE_FROM = upgrade_from
def set_upgrade_to(self, upgrade_to):
self.UPGRADE_TO = upgrade_to
def prop_file(self):
properties = self.extra_properties.copy()
properties[streams_property.STATE_DIR] = self.PERSISTENT_ROOT
properties[streams_property.KAFKA_SERVERS] = self.kafka.bootstrap_servers()
if self.UPGRADE_FROM is not None:
properties['upgrade.from'] = self.UPGRADE_FROM
if self.UPGRADE_TO == "future_version":
properties['test.future.metadata'] = "any_value"
# Long.MAX_VALUE lets us do the assignment without a warmup
properties['acceptable.recovery.lag'] = "9223372036854775807"
properties["session.timeout.ms"] = "10000" # set back to 10s for tests. See KIP-735
cfg = KafkaConfig(**properties)
return cfg.render()
def start_cmd(self, node):
args = self.args.copy()
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j_param'] = get_log4j_config_param(node)
args['log4j'] = get_log4j_config_for_tools(node)
args['version'] = self.KAFKA_STREAMS_VERSION
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "( export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
" %(kafka_run_class)s %(streams_class_name)s %(config_file)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
self.logger.info("Executing: " + cmd)
return cmd
class StreamsNamedRepartitionTopicService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsNamedRepartitionTopicService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsNamedRepartitionTest",
"")
self.ADD_ADDITIONAL_OPS = 'false'
self.INPUT_TOPIC = None
self.AGGREGATION_TOPIC = None
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
'input.topic': self.INPUT_TOPIC,
'aggregation.topic': self.AGGREGATION_TOPIC,
'add.operations': self.ADD_ADDITIONAL_OPS,
"acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment
"session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735
}
cfg = KafkaConfig(**properties)
return cfg.render()
class StaticMemberTestService(StreamsTestBaseService):
def __init__(self, test_context, kafka, group_instance_id, num_threads):
super(StaticMemberTestService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StaticMemberTestClient",
"")
self.INPUT_TOPIC = None
self.GROUP_INSTANCE_ID = group_instance_id
self.NUM_THREADS = num_threads
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.NUM_THREADS: self.NUM_THREADS,
consumer_property.GROUP_INSTANCE_ID: self.GROUP_INSTANCE_ID,
consumer_property.SESSION_TIMEOUT_MS: 60000, # set longer session timeout for static member test
'input.topic': self.INPUT_TOPIC,
"acceptable.recovery.lag": "9223372036854775807" # enable a one-shot assignment
}
cfg = KafkaConfig(**properties)
return cfg.render()
class CooperativeRebalanceUpgradeService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
super(CooperativeRebalanceUpgradeService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsUpgradeToCooperativeRebalanceTest",
"")
self.UPGRADE_FROM = None
# these properties will be overridden in test
self.SOURCE_TOPIC = None
self.SINK_TOPIC = None
self.TASK_DELIMITER = "#"
self.REPORT_INTERVAL = None
self.standby_tasks = None
self.active_tasks = None
self.upgrade_phase = None
def set_tasks(self, task_string):
label = "TASK-ASSIGNMENTS:"
task_string_substr = task_string[len(label):]
all_tasks = task_string_substr.split(self.TASK_DELIMITER)
self.active_tasks = set(all_tasks[0].split(","))
if len(all_tasks) > 1:
self.standby_tasks = set(all_tasks[1].split(","))
def set_version(self, kafka_streams_version):
self.KAFKA_STREAMS_VERSION = kafka_streams_version
def set_upgrade_phase(self, upgrade_phase):
self.upgrade_phase = upgrade_phase
def start_cmd(self, node):
args = self.args.copy()
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j_param'] = get_log4j_config_param(node)
args['log4j'] = get_log4j_config_for_tools(node)
args['version'] = self.KAFKA_STREAMS_VERSION
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "( export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
" %(kafka_run_class)s %(streams_class_name)s %(config_file)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
self.logger.info("Executing: " + cmd)
return cmd
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
'source.topic': self.SOURCE_TOPIC,
'sink.topic': self.SINK_TOPIC,
'task.delimiter': self.TASK_DELIMITER,
'report.interval': self.REPORT_INTERVAL,
"acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment
"session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735
}
if self.UPGRADE_FROM is not None:
properties['upgrade.from'] = self.UPGRADE_FROM
else:
try:
del properties['upgrade.from']
except KeyError:
self.logger.info("Key 'upgrade.from' not there, better safe than sorry")
if self.upgrade_phase is not None:
properties['upgrade.phase'] = self.upgrade_phase
cfg = KafkaConfig(**properties)
return cfg.render()