2016-02-24 04:14:26 +08:00
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
2016-05-07 02:10:27 +08:00
|
|
|
import os.path
|
|
|
|
import signal
|
2018-04-16 01:15:31 +08:00
|
|
|
import streams_property
|
2019-06-08 04:52:12 +08:00
|
|
|
import consumer_property
|
2016-02-24 04:14:26 +08:00
|
|
|
from ducktape.services.service import Service
|
|
|
|
from ducktape.utils.util import wait_until
|
2016-05-07 02:10:27 +08:00
|
|
|
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
2018-03-20 05:17:00 +08:00
|
|
|
from kafkatest.services.kafka import KafkaConfig
|
2018-04-27 01:04:59 +08:00
|
|
|
from kafkatest.services.monitor.jmx import JmxMixin
|
2019-12-21 06:21:12 +08:00
|
|
|
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
|
2016-04-30 01:28:33 +08:00
|
|
|
|
2018-04-27 01:04:59 +08:00
|
|
|
STATE_DIR = "state.dir"
|
2018-03-20 05:17:00 +08:00
|
|
|
|
2018-03-23 07:46:56 +08:00
|
|
|
class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
|
2017-01-19 03:55:23 +08:00
|
|
|
"""Base class for Streams Test services providing some common settings and functionality"""
|
2016-02-24 04:14:26 +08:00
|
|
|
|
|
|
|
PERSISTENT_ROOT = "/mnt/streams"
|
2018-03-20 05:17:00 +08:00
|
|
|
|
2016-02-24 04:14:26 +08:00
|
|
|
# The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately
|
2018-03-20 05:17:00 +08:00
|
|
|
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "streams.properties")
|
2016-02-24 04:14:26 +08:00
|
|
|
LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log")
|
|
|
|
STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout")
|
|
|
|
STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr")
|
2018-03-23 07:46:56 +08:00
|
|
|
JMX_LOG_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.log")
|
|
|
|
JMX_ERR_FILE = os.path.join(PERSISTENT_ROOT, "jmx_tool.err.log")
|
2016-02-24 04:14:26 +08:00
|
|
|
LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
|
|
|
|
PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
|
|
|
|
|
2018-04-07 08:00:52 +08:00
|
|
|
CLEAN_NODE_ENABLED = True
|
|
|
|
|
2016-02-24 04:14:26 +08:00
|
|
|
logs = {
|
|
|
|
"streams_log": {
|
|
|
|
"path": LOG_FILE,
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout": {
|
|
|
|
"path": STDOUT_FILE,
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr": {
|
|
|
|
"path": STDERR_FILE,
|
|
|
|
"collect_default": True},
|
2018-05-31 13:39:42 +08:00
|
|
|
"streams_log.1": {
|
|
|
|
"path": LOG_FILE + ".1",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.1": {
|
|
|
|
"path": STDOUT_FILE + ".1",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.1": {
|
|
|
|
"path": STDERR_FILE + ".1",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.2": {
|
|
|
|
"path": LOG_FILE + ".2",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.2": {
|
|
|
|
"path": STDOUT_FILE + ".2",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.2": {
|
|
|
|
"path": STDERR_FILE + ".2",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.3": {
|
|
|
|
"path": LOG_FILE + ".3",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.3": {
|
|
|
|
"path": STDOUT_FILE + ".3",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.3": {
|
|
|
|
"path": STDERR_FILE + ".3",
|
|
|
|
"collect_default": True},
|
2018-04-07 08:00:52 +08:00
|
|
|
"streams_log.0-1": {
|
|
|
|
"path": LOG_FILE + ".0-1",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.0-1": {
|
|
|
|
"path": STDOUT_FILE + ".0-1",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.0-1": {
|
|
|
|
"path": STDERR_FILE + ".0-1",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.0-2": {
|
|
|
|
"path": LOG_FILE + ".0-2",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.0-2": {
|
|
|
|
"path": STDOUT_FILE + ".0-2",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.0-2": {
|
|
|
|
"path": STDERR_FILE + ".0-2",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.0-3": {
|
|
|
|
"path": LOG_FILE + ".0-3",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.0-3": {
|
|
|
|
"path": STDOUT_FILE + ".0-3",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.0-3": {
|
|
|
|
"path": STDERR_FILE + ".0-3",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.0-4": {
|
|
|
|
"path": LOG_FILE + ".0-4",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.0-4": {
|
|
|
|
"path": STDOUT_FILE + ".0-4",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.0-4": {
|
|
|
|
"path": STDERR_FILE + ".0-4",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.0-5": {
|
|
|
|
"path": LOG_FILE + ".0-5",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.0-5": {
|
|
|
|
"path": STDOUT_FILE + ".0-5",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.0-5": {
|
|
|
|
"path": STDERR_FILE + ".0-5",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.0-6": {
|
|
|
|
"path": LOG_FILE + ".0-6",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.0-6": {
|
|
|
|
"path": STDOUT_FILE + ".0-6",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.0-6": {
|
|
|
|
"path": STDERR_FILE + ".0-6",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.1-1": {
|
|
|
|
"path": LOG_FILE + ".1-1",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.1-1": {
|
|
|
|
"path": STDOUT_FILE + ".1-1",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.1-1": {
|
|
|
|
"path": STDERR_FILE + ".1-1",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.1-2": {
|
|
|
|
"path": LOG_FILE + ".1-2",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.1-2": {
|
|
|
|
"path": STDOUT_FILE + ".1-2",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.1-2": {
|
|
|
|
"path": STDERR_FILE + ".1-2",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.1-3": {
|
|
|
|
"path": LOG_FILE + ".1-3",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.1-3": {
|
|
|
|
"path": STDOUT_FILE + ".1-3",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.1-3": {
|
|
|
|
"path": STDERR_FILE + ".1-3",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.1-4": {
|
|
|
|
"path": LOG_FILE + ".1-4",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.1-4": {
|
|
|
|
"path": STDOUT_FILE + ".1-4",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.1-4": {
|
|
|
|
"path": STDERR_FILE + ".1-4",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.1-5": {
|
|
|
|
"path": LOG_FILE + ".1-5",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.1-5": {
|
|
|
|
"path": STDOUT_FILE + ".1-5",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.1-5": {
|
|
|
|
"path": STDERR_FILE + ".1-5",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_log.1-6": {
|
|
|
|
"path": LOG_FILE + ".1-6",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stdout.1-6": {
|
|
|
|
"path": STDOUT_FILE + ".1-6",
|
|
|
|
"collect_default": True},
|
|
|
|
"streams_stderr.1-6": {
|
|
|
|
"path": STDERR_FILE + ".1-6",
|
|
|
|
"collect_default": True},
|
2018-03-23 07:46:56 +08:00
|
|
|
"jmx_log": {
|
|
|
|
"path": JMX_LOG_FILE,
|
|
|
|
"collect_default": True},
|
|
|
|
"jmx_err": {
|
|
|
|
"path": JMX_ERR_FILE,
|
|
|
|
"collect_default": True},
|
2016-02-24 04:14:26 +08:00
|
|
|
}
|
|
|
|
|
2018-04-16 01:15:31 +08:00
|
|
|
def __init__(self, test_context, kafka, streams_class_name, user_test_args1, user_test_args2=None, user_test_args3=None, user_test_args4=None):
|
2018-03-23 07:46:56 +08:00
|
|
|
Service.__init__(self, test_context, num_nodes=1)
|
2016-02-24 04:14:26 +08:00
|
|
|
self.kafka = kafka
|
2017-01-19 03:55:23 +08:00
|
|
|
self.args = {'streams_class_name': streams_class_name,
|
2017-02-09 05:06:09 +08:00
|
|
|
'user_test_args1': user_test_args1,
|
2017-06-21 18:46:59 +08:00
|
|
|
'user_test_args2': user_test_args2,
|
2018-04-16 01:15:31 +08:00
|
|
|
'user_test_args3': user_test_args3,
|
|
|
|
'user_test_args4': user_test_args4}
|
2017-02-09 05:06:09 +08:00
|
|
|
self.log_level = "DEBUG"
|
2016-02-24 04:14:26 +08:00
|
|
|
|
|
|
|
@property
|
|
|
|
def node(self):
|
|
|
|
return self.nodes[0]
|
|
|
|
|
|
|
|
def pids(self, node):
|
|
|
|
try:
|
|
|
|
return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)]
|
|
|
|
except:
|
|
|
|
return []
|
|
|
|
|
2016-11-18 04:49:20 +08:00
|
|
|
def stop_nodes(self, clean_shutdown=True):
|
|
|
|
for node in self.nodes:
|
|
|
|
self.stop_node(node, clean_shutdown)
|
|
|
|
|
2016-02-24 04:14:26 +08:00
|
|
|
def stop_node(self, node, clean_shutdown=True):
|
2017-01-19 03:55:23 +08:00
|
|
|
self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams Test on " + str(node.account))
|
2016-02-24 04:14:26 +08:00
|
|
|
pids = self.pids(node)
|
|
|
|
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
|
|
|
|
|
|
|
|
for pid in pids:
|
|
|
|
node.account.signal(pid, sig, allow_fail=True)
|
|
|
|
if clean_shutdown:
|
|
|
|
for pid in pids:
|
2017-02-28 03:19:09 +08:00
|
|
|
wait_until(lambda: not node.account.alive(pid), timeout_sec=120, err_msg="Streams Test process on " + str(node.account) + " took too long to exit")
|
2016-02-24 04:14:26 +08:00
|
|
|
|
|
|
|
node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
|
|
|
|
|
|
|
|
def restart(self):
|
|
|
|
# We don't want to do any clean up here, just restart the process.
|
|
|
|
for node in self.nodes:
|
|
|
|
self.logger.info("Restarting Kafka Streams on " + str(node.account))
|
|
|
|
self.stop_node(node)
|
|
|
|
self.start_node(node)
|
|
|
|
|
2016-11-18 04:49:20 +08:00
|
|
|
|
2016-02-24 04:14:26 +08:00
|
|
|
def abortThenRestart(self):
|
|
|
|
# We don't want to do any clean up here, just abort then restart the process. The running service is killed immediately.
|
|
|
|
for node in self.nodes:
|
|
|
|
self.logger.info("Aborting Kafka Streams on " + str(node.account))
|
|
|
|
self.stop_node(node, False)
|
|
|
|
self.logger.info("Restarting Kafka Streams on " + str(node.account))
|
|
|
|
self.start_node(node)
|
|
|
|
|
2017-03-30 06:00:26 +08:00
|
|
|
def wait(self, timeout_sec=1440):
|
2016-02-24 04:14:26 +08:00
|
|
|
for node in self.nodes:
|
2017-01-19 03:55:23 +08:00
|
|
|
self.wait_node(node, timeout_sec)
|
|
|
|
|
|
|
|
def wait_node(self, node, timeout_sec=None):
|
|
|
|
for pid in self.pids(node):
|
|
|
|
wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, err_msg="Streams Test process on " + str(node.account) + " took too long to exit")
|
2016-02-24 04:14:26 +08:00
|
|
|
|
|
|
|
def clean_node(self, node):
|
|
|
|
node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
|
2018-04-07 08:00:52 +08:00
|
|
|
if self.CLEAN_NODE_ENABLED:
|
|
|
|
node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
|
2016-02-24 04:14:26 +08:00
|
|
|
|
|
|
|
def start_cmd(self, node):
|
|
|
|
args = self.args.copy()
|
2018-03-20 05:17:00 +08:00
|
|
|
args['config_file'] = self.CONFIG_FILE
|
2016-02-24 04:14:26 +08:00
|
|
|
args['stdout'] = self.STDOUT_FILE
|
|
|
|
args['stderr'] = self.STDERR_FILE
|
|
|
|
args['pidfile'] = self.PID_FILE
|
|
|
|
args['log4j'] = self.LOG4J_CONFIG_FILE
|
2016-05-07 02:10:27 +08:00
|
|
|
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
|
2016-02-24 04:14:26 +08:00
|
|
|
|
|
|
|
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
|
2017-01-19 03:55:23 +08:00
|
|
|
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
|
2018-04-16 01:15:31 +08:00
|
|
|
" %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \
|
|
|
|
" %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
|
2018-03-20 05:17:00 +08:00
|
|
|
|
2018-03-23 07:46:56 +08:00
|
|
|
self.logger.info("Executing streams cmd: " + cmd)
|
2016-02-24 04:14:26 +08:00
|
|
|
|
|
|
|
return cmd
|
|
|
|
|
2018-04-07 08:00:52 +08:00
|
|
|
def prop_file(self):
|
2018-04-16 01:15:31 +08:00
|
|
|
cfg = KafkaConfig(**{streams_property.STATE_DIR: self.PERSISTENT_ROOT, streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()})
|
2018-03-20 05:17:00 +08:00
|
|
|
return cfg.render()
|
2016-02-24 04:14:26 +08:00
|
|
|
|
2018-03-20 05:17:00 +08:00
|
|
|
def start_node(self, node):
|
|
|
|
node.account.mkdirs(self.PERSISTENT_ROOT)
|
2018-04-07 08:00:52 +08:00
|
|
|
prop_file = self.prop_file()
|
2018-03-20 05:17:00 +08:00
|
|
|
node.account.create_file(self.CONFIG_FILE, prop_file)
|
2016-02-24 04:14:26 +08:00
|
|
|
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE))
|
|
|
|
|
2017-01-19 03:55:23 +08:00
|
|
|
self.logger.info("Starting StreamsTest process on " + str(node.account))
|
2016-02-24 04:14:26 +08:00
|
|
|
with node.account.monitor_log(self.STDOUT_FILE) as monitor:
|
|
|
|
node.account.ssh(self.start_cmd(node))
|
2017-10-07 08:48:34 +08:00
|
|
|
monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
|
2016-02-24 04:14:26 +08:00
|
|
|
|
|
|
|
if len(self.pids(node)) == 0:
|
|
|
|
raise RuntimeError("No process ids recorded")
|
|
|
|
|
|
|
|
|
2017-01-19 03:55:23 +08:00
|
|
|
class StreamsSmokeTestBaseService(StreamsTestBaseService):
|
|
|
|
"""Base class for Streams Smoke Test services providing some common settings and functionality"""
|
|
|
|
|
2020-04-11 02:55:01 +08:00
|
|
|
def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
|
2017-01-19 03:55:23 +08:00
|
|
|
super(StreamsSmokeTestBaseService, self).__init__(test_context,
|
|
|
|
kafka,
|
2017-01-25 11:36:08 +08:00
|
|
|
"org.apache.kafka.streams.tests.StreamsSmokeTest",
|
2017-01-19 03:55:23 +08:00
|
|
|
command)
|
KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe() (#7441)
Inside onLeavePrepare we would look into the assignment and try to revoke the owned tasks and notify users via RebalanceListener#onPartitionsRevoked, and then clear the assignment.
However, the subscription's assignment is already cleared in this.subscriptions.unsubscribe(); which means user's rebalance listener would never be triggered. In other words, from consumer client's pov nothing is owned after unsubscribe, but from the user caller's pov the partitions are not revoked yet. For callers like Kafka Streams which rely on the rebalance listener to maintain their internal state, this leads to inconsistent state management and failure cases.
Before KIP-429 this issue is hidden away since every time the consumer re-joins the group later, it would still revoke everything anyways regardless of the passed-in parameters of the rebalance listener; with KIP-429 this is easier to reproduce now.
Our fixes are following:
• Inside unsubscribe, first do onLeavePrepare / maybeLeaveGroup and then subscription.unsubscribe. This we we are guaranteed that the streams' tasks are all closed as revoked by then.
• [Optimization] If the generation is reset due to fatal error from join / hb response etc, then we know that all partitions are lost, and we should not trigger onPartitionRevoked, but instead just onPartitionsLost inside onLeavePrepare. This is because we don't want to commit for lost tracks during rebalance which is doomed to fail as we don't have any generation info.
Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
2019-10-30 01:41:25 +08:00
|
|
|
self.NUM_THREADS = num_threads
|
2020-04-11 02:55:01 +08:00
|
|
|
self.PROCESSING_GUARANTEE = processing_guarantee
|
2017-01-19 03:55:23 +08:00
|
|
|
|
KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe() (#7441)
Inside onLeavePrepare we would look into the assignment and try to revoke the owned tasks and notify users via RebalanceListener#onPartitionsRevoked, and then clear the assignment.
However, the subscription's assignment is already cleared in this.subscriptions.unsubscribe(); which means user's rebalance listener would never be triggered. In other words, from consumer client's pov nothing is owned after unsubscribe, but from the user caller's pov the partitions are not revoked yet. For callers like Kafka Streams which rely on the rebalance listener to maintain their internal state, this leads to inconsistent state management and failure cases.
Before KIP-429 this issue is hidden away since every time the consumer re-joins the group later, it would still revoke everything anyways regardless of the passed-in parameters of the rebalance listener; with KIP-429 this is easier to reproduce now.
Our fixes are following:
• Inside unsubscribe, first do onLeavePrepare / maybeLeaveGroup and then subscription.unsubscribe. This we we are guaranteed that the streams' tasks are all closed as revoked by then.
• [Optimization] If the generation is reset due to fatal error from join / hb response etc, then we know that all partitions are lost, and we should not trigger onPartitionRevoked, but instead just onPartitionsLost inside onLeavePrepare. This is because we don't want to commit for lost tracks during rebalance which is doomed to fail as we don't have any generation info.
Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
2019-10-30 01:41:25 +08:00
|
|
|
def prop_file(self):
|
|
|
|
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
|
|
|
|
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
|
2020-04-11 02:55:01 +08:00
|
|
|
streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE,
|
KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe() (#7441)
Inside onLeavePrepare we would look into the assignment and try to revoke the owned tasks and notify users via RebalanceListener#onPartitionsRevoked, and then clear the assignment.
However, the subscription's assignment is already cleared in this.subscriptions.unsubscribe(); which means user's rebalance listener would never be triggered. In other words, from consumer client's pov nothing is owned after unsubscribe, but from the user caller's pov the partitions are not revoked yet. For callers like Kafka Streams which rely on the rebalance listener to maintain their internal state, this leads to inconsistent state management and failure cases.
Before KIP-429 this issue is hidden away since every time the consumer re-joins the group later, it would still revoke everything anyways regardless of the passed-in parameters of the rebalance listener; with KIP-429 this is easier to reproduce now.
Our fixes are following:
• Inside unsubscribe, first do onLeavePrepare / maybeLeaveGroup and then subscription.unsubscribe. This we we are guaranteed that the streams' tasks are all closed as revoked by then.
• [Optimization] If the generation is reset due to fatal error from join / hb response etc, then we know that all partitions are lost, and we should not trigger onPartitionRevoked, but instead just onPartitionsLost inside onLeavePrepare. This is because we don't want to commit for lost tracks during rebalance which is doomed to fail as we don't have any generation info.
Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
2019-10-30 01:41:25 +08:00
|
|
|
streams_property.NUM_THREADS: self.NUM_THREADS}
|
|
|
|
|
|
|
|
cfg = KafkaConfig(**properties)
|
|
|
|
return cfg.render()
|
2017-01-19 03:55:23 +08:00
|
|
|
|
2017-06-09 05:08:54 +08:00
|
|
|
class StreamsEosTestBaseService(StreamsTestBaseService):
|
|
|
|
"""Base class for Streams EOS Test services providing some common settings and functionality"""
|
|
|
|
|
2018-02-17 05:13:13 +08:00
|
|
|
clean_node_enabled = True
|
|
|
|
|
2017-06-09 05:08:54 +08:00
|
|
|
def __init__(self, test_context, kafka, command):
|
|
|
|
super(StreamsEosTestBaseService, self).__init__(test_context,
|
|
|
|
kafka,
|
|
|
|
"org.apache.kafka.streams.tests.StreamsEosTest",
|
|
|
|
command)
|
|
|
|
|
2018-02-17 05:13:13 +08:00
|
|
|
def clean_node(self, node):
|
|
|
|
if self.clean_node_enabled:
|
2018-03-16 05:42:43 +08:00
|
|
|
super(StreamsEosTestBaseService, self).clean_node(node)
|
2018-02-17 05:13:13 +08:00
|
|
|
|
2017-06-09 05:08:54 +08:00
|
|
|
|
2016-02-24 04:14:26 +08:00
|
|
|
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
|
2017-01-19 03:55:23 +08:00
|
|
|
def __init__(self, test_context, kafka):
|
|
|
|
super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run")
|
2018-04-07 08:00:52 +08:00
|
|
|
self.DISABLE_AUTO_TERMINATE = ""
|
2016-02-24 04:14:26 +08:00
|
|
|
|
2018-04-07 08:00:52 +08:00
|
|
|
def disable_auto_terminate(self):
|
|
|
|
self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate"
|
|
|
|
|
|
|
|
def start_cmd(self, node):
|
|
|
|
args = self.args.copy()
|
|
|
|
args['config_file'] = self.CONFIG_FILE
|
|
|
|
args['stdout'] = self.STDOUT_FILE
|
|
|
|
args['stderr'] = self.STDERR_FILE
|
|
|
|
args['pidfile'] = self.PID_FILE
|
|
|
|
args['log4j'] = self.LOG4J_CONFIG_FILE
|
|
|
|
args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE
|
|
|
|
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
|
|
|
|
|
|
|
|
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
|
|
|
|
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
|
2018-04-27 01:04:59 +08:00
|
|
|
" %(config_file)s %(user_test_args1)s %(disable_auto_terminate)s" \
|
2018-04-07 08:00:52 +08:00
|
|
|
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
|
|
|
|
|
|
|
|
return cmd
|
2016-04-30 01:28:33 +08:00
|
|
|
|
2016-02-24 04:14:26 +08:00
|
|
|
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
|
2020-04-11 02:55:01 +08:00
|
|
|
def __init__(self, test_context, kafka, processing_guarantee, num_threads = 3):
|
|
|
|
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", processing_guarantee, num_threads)
|
2016-11-18 04:49:20 +08:00
|
|
|
|
2017-06-09 05:08:54 +08:00
|
|
|
class StreamsEosTestDriverService(StreamsEosTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka):
|
|
|
|
super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run")
|
|
|
|
|
|
|
|
|
|
|
|
class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka):
|
|
|
|
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
|
|
|
|
|
2017-06-26 00:34:27 +08:00
|
|
|
class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka):
|
|
|
|
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex")
|
|
|
|
|
2017-06-09 05:08:54 +08:00
|
|
|
class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka):
|
|
|
|
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
|
|
|
|
|
|
|
|
|
2017-06-26 00:34:27 +08:00
|
|
|
class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka):
|
|
|
|
super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex")
|
|
|
|
|
|
|
|
|
2016-11-18 04:49:20 +08:00
|
|
|
class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
|
2017-01-19 03:55:23 +08:00
|
|
|
def __init__(self, test_context, kafka):
|
|
|
|
super(StreamsSmokeTestShutdownDeadlockService, self).__init__(test_context, kafka, "close-deadlock-test")
|
2017-01-27 13:46:37 +08:00
|
|
|
|
|
|
|
|
|
|
|
class StreamsBrokerCompatibilityService(StreamsTestBaseService):
|
2020-03-31 06:21:27 +08:00
|
|
|
def __init__(self, test_context, kafka, processingMode):
|
2017-01-27 13:46:37 +08:00
|
|
|
super(StreamsBrokerCompatibilityService, self).__init__(test_context,
|
|
|
|
kafka,
|
|
|
|
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
|
2020-03-31 06:21:27 +08:00
|
|
|
processingMode)
|
2017-12-20 07:37:21 +08:00
|
|
|
|
2018-02-08 09:21:35 +08:00
|
|
|
|
2017-12-20 07:37:21 +08:00
|
|
|
class StreamsBrokerDownResilienceService(StreamsTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka, configs):
|
|
|
|
super(StreamsBrokerDownResilienceService, self).__init__(test_context,
|
|
|
|
kafka,
|
|
|
|
"org.apache.kafka.streams.tests.StreamsBrokerDownResilienceTest",
|
|
|
|
configs)
|
2018-02-08 09:21:35 +08:00
|
|
|
|
|
|
|
def start_cmd(self, node):
|
|
|
|
args = self.args.copy()
|
2018-03-20 05:17:00 +08:00
|
|
|
args['config_file'] = self.CONFIG_FILE
|
2018-02-08 09:21:35 +08:00
|
|
|
args['stdout'] = self.STDOUT_FILE
|
|
|
|
args['stderr'] = self.STDERR_FILE
|
|
|
|
args['pidfile'] = self.PID_FILE
|
|
|
|
args['log4j'] = self.LOG4J_CONFIG_FILE
|
|
|
|
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
|
|
|
|
|
|
|
|
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
|
|
|
|
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
|
2018-04-16 01:15:31 +08:00
|
|
|
" %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \
|
|
|
|
" %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
|
2018-03-20 05:17:00 +08:00
|
|
|
|
2018-03-16 05:42:43 +08:00
|
|
|
self.logger.info("Executing: " + cmd)
|
2018-03-20 05:17:00 +08:00
|
|
|
|
2018-02-08 09:21:35 +08:00
|
|
|
return cmd
|
2018-03-06 03:06:32 +08:00
|
|
|
|
|
|
|
|
|
|
|
class StreamsStandbyTaskService(StreamsTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka, configs):
|
|
|
|
super(StreamsStandbyTaskService, self).__init__(test_context,
|
|
|
|
kafka,
|
|
|
|
"org.apache.kafka.streams.tests.StreamsStandByReplicaTest",
|
|
|
|
configs)
|
|
|
|
|
|
|
|
|
2018-11-28 05:07:34 +08:00
|
|
|
class StreamsOptimizedUpgradeTestService(StreamsTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka):
|
|
|
|
super(StreamsOptimizedUpgradeTestService, self).__init__(test_context,
|
|
|
|
kafka,
|
|
|
|
"org.apache.kafka.streams.tests.StreamsOptimizedTest",
|
|
|
|
"")
|
|
|
|
self.OPTIMIZED_CONFIG = 'none'
|
|
|
|
self.INPUT_TOPIC = None
|
|
|
|
self.AGGREGATION_TOPIC = None
|
|
|
|
self.REDUCE_TOPIC = None
|
|
|
|
self.JOIN_TOPIC = None
|
|
|
|
|
|
|
|
def prop_file(self):
|
|
|
|
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
|
|
|
|
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
|
|
|
|
|
|
|
|
properties['topology.optimization'] = self.OPTIMIZED_CONFIG
|
|
|
|
properties['input.topic'] = self.INPUT_TOPIC
|
|
|
|
properties['aggregation.topic'] = self.AGGREGATION_TOPIC
|
|
|
|
properties['reduce.topic'] = self.REDUCE_TOPIC
|
|
|
|
properties['join.topic'] = self.JOIN_TOPIC
|
|
|
|
|
|
|
|
cfg = KafkaConfig(**properties)
|
|
|
|
return cfg.render()
|
|
|
|
|
|
|
|
|
2018-04-07 08:00:52 +08:00
|
|
|
class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka):
|
|
|
|
super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,
|
|
|
|
kafka,
|
|
|
|
"org.apache.kafka.streams.tests.StreamsUpgradeTest",
|
|
|
|
"")
|
|
|
|
self.UPGRADE_FROM = None
|
2018-04-18 15:38:27 +08:00
|
|
|
self.UPGRADE_TO = None
|
2018-04-07 08:00:52 +08:00
|
|
|
|
|
|
|
def set_version(self, kafka_streams_version):
|
|
|
|
self.KAFKA_STREAMS_VERSION = kafka_streams_version
|
|
|
|
|
|
|
|
def set_upgrade_from(self, upgrade_from):
|
|
|
|
self.UPGRADE_FROM = upgrade_from
|
|
|
|
|
2018-05-31 13:39:42 +08:00
|
|
|
def set_upgrade_to(self, upgrade_to):
|
|
|
|
self.UPGRADE_TO = upgrade_to
|
|
|
|
|
2018-04-07 08:00:52 +08:00
|
|
|
def prop_file(self):
|
2018-05-31 13:39:42 +08:00
|
|
|
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
|
|
|
|
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
|
2018-04-07 08:00:52 +08:00
|
|
|
if self.UPGRADE_FROM is not None:
|
|
|
|
properties['upgrade.from'] = self.UPGRADE_FROM
|
2018-05-31 13:39:42 +08:00
|
|
|
if self.UPGRADE_TO == "future_version":
|
|
|
|
properties['test.future.metadata'] = "any_value"
|
2018-04-07 08:00:52 +08:00
|
|
|
|
|
|
|
cfg = KafkaConfig(**properties)
|
|
|
|
return cfg.render()
|
|
|
|
|
|
|
|
def start_cmd(self, node):
|
|
|
|
args = self.args.copy()
|
2019-10-24 22:28:29 +08:00
|
|
|
|
2018-04-07 08:00:52 +08:00
|
|
|
if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
|
|
|
|
args['zk'] = self.kafka.zk.connect_setting()
|
|
|
|
else:
|
|
|
|
args['zk'] = ""
|
|
|
|
args['config_file'] = self.CONFIG_FILE
|
|
|
|
args['stdout'] = self.STDOUT_FILE
|
|
|
|
args['stderr'] = self.STDERR_FILE
|
|
|
|
args['pidfile'] = self.PID_FILE
|
|
|
|
args['log4j'] = self.LOG4J_CONFIG_FILE
|
|
|
|
args['version'] = self.KAFKA_STREAMS_VERSION
|
|
|
|
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
|
|
|
|
|
|
|
|
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
|
|
|
|
"INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
|
2019-10-24 22:28:29 +08:00
|
|
|
" %(kafka_run_class)s %(streams_class_name)s %(zk)s %(config_file)s " \
|
2018-04-07 08:00:52 +08:00
|
|
|
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
|
|
|
|
|
|
|
|
self.logger.info("Executing: " + cmd)
|
|
|
|
|
|
|
|
return cmd
|
2018-12-04 04:37:31 +08:00
|
|
|
|
|
|
|
|
|
|
|
class StreamsNamedRepartitionTopicService(StreamsTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka):
|
|
|
|
super(StreamsNamedRepartitionTopicService, self).__init__(test_context,
|
|
|
|
kafka,
|
|
|
|
"org.apache.kafka.streams.tests.StreamsNamedRepartitionTest",
|
|
|
|
"")
|
|
|
|
self.ADD_ADDITIONAL_OPS = 'false'
|
|
|
|
self.INPUT_TOPIC = None
|
|
|
|
self.AGGREGATION_TOPIC = None
|
|
|
|
|
|
|
|
def prop_file(self):
|
|
|
|
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
|
|
|
|
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
|
|
|
|
|
|
|
|
properties['input.topic'] = self.INPUT_TOPIC
|
|
|
|
properties['aggregation.topic'] = self.AGGREGATION_TOPIC
|
|
|
|
properties['add.operations'] = self.ADD_ADDITIONAL_OPS
|
|
|
|
|
|
|
|
cfg = KafkaConfig(**properties)
|
|
|
|
return cfg.render()
|
2019-06-08 04:52:12 +08:00
|
|
|
|
2019-10-17 13:29:33 +08:00
|
|
|
|
2019-06-08 04:52:12 +08:00
|
|
|
class StaticMemberTestService(StreamsTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka, group_instance_id, num_threads):
|
|
|
|
super(StaticMemberTestService, self).__init__(test_context,
|
|
|
|
kafka,
|
|
|
|
"org.apache.kafka.streams.tests.StaticMemberTestClient",
|
|
|
|
"")
|
|
|
|
self.INPUT_TOPIC = None
|
|
|
|
self.GROUP_INSTANCE_ID = group_instance_id
|
|
|
|
self.NUM_THREADS = num_threads
|
|
|
|
def prop_file(self):
|
|
|
|
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
|
|
|
|
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
|
|
|
|
streams_property.NUM_THREADS: self.NUM_THREADS,
|
|
|
|
consumer_property.GROUP_INSTANCE_ID: self.GROUP_INSTANCE_ID,
|
|
|
|
consumer_property.SESSION_TIMEOUT_MS: 60000}
|
|
|
|
|
|
|
|
properties['input.topic'] = self.INPUT_TOPIC
|
|
|
|
|
|
|
|
cfg = KafkaConfig(**properties)
|
|
|
|
return cfg.render()
|
2019-10-17 13:29:33 +08:00
|
|
|
|
|
|
|
|
|
|
|
class CooperativeRebalanceUpgradeService(StreamsTestBaseService):
|
|
|
|
def __init__(self, test_context, kafka):
|
|
|
|
super(CooperativeRebalanceUpgradeService, self).__init__(test_context,
|
|
|
|
kafka,
|
|
|
|
"org.apache.kafka.streams.tests.StreamsUpgradeToCooperativeRebalanceTest",
|
|
|
|
"")
|
|
|
|
self.UPGRADE_FROM = None
|
|
|
|
# these properties will be overridden in test
|
|
|
|
self.SOURCE_TOPIC = None
|
|
|
|
self.SINK_TOPIC = None
|
|
|
|
self.TASK_DELIMITER = "#"
|
|
|
|
self.REPORT_INTERVAL = None
|
|
|
|
|
|
|
|
self.standby_tasks = None
|
|
|
|
self.active_tasks = None
|
|
|
|
self.upgrade_phase = None
|
|
|
|
|
|
|
|
def set_tasks(self, task_string):
|
|
|
|
label = "TASK-ASSIGNMENTS:"
|
|
|
|
task_string_substr = task_string[len(label):]
|
|
|
|
all_tasks = task_string_substr.split(self.TASK_DELIMITER)
|
|
|
|
self.active_tasks = set(all_tasks[0].split(","))
|
|
|
|
if len(all_tasks) > 1:
|
|
|
|
self.standby_tasks = set(all_tasks[1].split(","))
|
|
|
|
|
|
|
|
def set_version(self, kafka_streams_version):
|
|
|
|
self.KAFKA_STREAMS_VERSION = kafka_streams_version
|
|
|
|
|
|
|
|
def set_upgrade_phase(self, upgrade_phase):
|
|
|
|
self.upgrade_phase = upgrade_phase
|
|
|
|
|
|
|
|
def start_cmd(self, node):
|
|
|
|
args = self.args.copy()
|
2019-10-21 22:51:15 +08:00
|
|
|
|
2019-10-17 13:29:33 +08:00
|
|
|
if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
|
|
|
|
args['zk'] = self.kafka.zk.connect_setting()
|
|
|
|
else:
|
|
|
|
args['zk'] = ""
|
|
|
|
args['config_file'] = self.CONFIG_FILE
|
|
|
|
args['stdout'] = self.STDOUT_FILE
|
|
|
|
args['stderr'] = self.STDERR_FILE
|
|
|
|
args['pidfile'] = self.PID_FILE
|
|
|
|
args['log4j'] = self.LOG4J_CONFIG_FILE
|
|
|
|
args['version'] = self.KAFKA_STREAMS_VERSION
|
|
|
|
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
|
|
|
|
|
|
|
|
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
|
|
|
|
"INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
|
2019-10-21 22:51:15 +08:00
|
|
|
" %(kafka_run_class)s %(streams_class_name)s %(zk)s %(config_file)s " \
|
2019-10-17 13:29:33 +08:00
|
|
|
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
|
|
|
|
|
|
|
|
self.logger.info("Executing: " + cmd)
|
|
|
|
|
|
|
|
return cmd
|
|
|
|
|
|
|
|
def prop_file(self):
|
|
|
|
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
|
|
|
|
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
|
|
|
|
|
|
|
|
if self.UPGRADE_FROM is not None:
|
|
|
|
properties['upgrade.from'] = self.UPGRADE_FROM
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
del properties['upgrade.from']
|
|
|
|
except KeyError:
|
|
|
|
self.logger.info("Key 'upgrade.from' not there, better safe than sorry")
|
|
|
|
|
|
|
|
if self.upgrade_phase is not None:
|
|
|
|
properties['upgrade.phase'] = self.upgrade_phase
|
|
|
|
|
|
|
|
properties['source.topic'] = self.SOURCE_TOPIC
|
|
|
|
properties['sink.topic'] = self.SINK_TOPIC
|
|
|
|
properties['task.delimiter'] = self.TASK_DELIMITER
|
|
|
|
properties['report.interval'] = self.REPORT_INTERVAL
|
|
|
|
|
|
|
|
cfg = KafkaConfig(**properties)
|
|
|
|
return cfg.render()
|