KAFKA-7773; Add end to end system test relying on verifiable consumer (#6070)

This commit creates an EndToEndTest base class which relies on the verifiable consumer. This will ultimately replace ProduceConsumeValidateTest which depends on the console consumer. The advantage is that the verifiable consumer exposes more information to use for validation. It also allows for a nicer shutdown pattern. Rather than relying on the console consumer idle timeout, which requires a minimum wait time, we can halt consumption after we have reached the last acked offsets. This should be more reliable and faster. The downside is that the verifiable consumer only works with the new consumer, so we cannot yet convert the upgrade tests. This commit converts only the replication tests and a flaky security test to use EndToEndTest.
This commit is contained in:
Jason Gustafson 2019-01-08 06:14:51 -08:00 committed by Rajini Sivaram
parent 8ae985705f
commit f9a22f42a8
10 changed files with 290 additions and 132 deletions

View File

@ -532,6 +532,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
return missing
def restart_cluster(self, clean_shutdown=True):
for node in self.nodes:
self.restart_node(node, clean_shutdown=clean_shutdown)
def restart_node(self, node, clean_shutdown=True):
"""Restart the given node."""
self.stop_node(node, clean_shutdown)

View File

@ -50,6 +50,7 @@ replica.lag.time.max.ms={{replica_lag}}
{% if auto_create_topics_enable is defined and auto_create_topics_enable is not none %}
auto.create.topics.enable={{ auto_create_topics_enable }}
{% endif %}
offsets.topic.num.partitions={{ num_nodes }}
offsets.topic.replication.factor={{ 3 if num_nodes > 3 else num_nodes }}
# Set to a low, but non-zero value to exercise this path without making tests much slower
group.initial.rebalance.delay.ms=100

View File

@ -161,7 +161,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
def __init__(self, context, num_nodes, kafka, topic, group_id,
max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None):
version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None,
on_record_consumed=None):
"""
:param jaas_override_variables: A dict of variables to be used in the jaas.conf template file
"""
@ -177,6 +178,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
self.assignment_strategy = assignment_strategy
self.prop_file = ""
self.stop_timeout_sec = stop_timeout_sec
self.on_record_consumed = on_record_consumed
self.event_handlers = {}
self.global_position = {}
@ -228,6 +230,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
elif name == "records_consumed":
handler.handle_records_consumed(event)
self._update_global_position(event, node)
elif name == "record_data" and self.on_record_consumed:
self.on_record_consumed(event, node)
elif name == "partitions_revoked":
handler.handle_partitions_revoked(event)
elif name == "partitions_assigned":
@ -268,6 +272,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
cmd += self.impl.exec_cmd(node)
if self.on_record_consumed:
cmd += " --verbose"
cmd += " --group-id %s --topic %s --broker-list %s --session-timeout %s --assignment-strategy %s %s" % \
(self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol),
self.session_timeout_sec*1000, self.assignment_strategy, "--enable-autocommit" if self.enable_autocommit else "")

View File

@ -20,6 +20,7 @@ import time
from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import TopicPartition
from kafkatest.services.verifiable_client import VerifiableClientMixin
from kafkatest.utils import is_int, is_int_with_prefix
from kafkatest.version import DEV_BRANCH
@ -90,6 +91,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
for node in self.nodes:
node.version = version
self.acked_values = []
self._last_acked_offsets = {}
self.not_acked_values = []
self.produced_count = {}
self.clean_shutdown_nodes = set()
@ -175,7 +177,9 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
self.produced_count[idx] += 1
elif data["name"] == "producer_send_success":
partition = TopicPartition(data["topic"], data["partition"])
self.acked_values.append(self.message_validator(data["value"]))
self._last_acked_offsets[partition] = data["offset"]
self.produced_count[idx] += 1
# Log information if there is a large gap between successively acknowledged messages
@ -241,6 +245,11 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
def alive(self, node):
return len(self.pids(node)) > 0
@property
def last_acked_offsets(self):
with self.lock:
return self._last_acked_offsets
@property
def acked(self):
with self.lock:

View File

@ -19,12 +19,7 @@ from ducktape.mark import matrix
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.tests.end_to_end import EndToEndTest
import signal
@ -83,7 +78,7 @@ failures = {
}
class ReplicationTest(ProduceConsumeValidateTest):
class ReplicationTest(EndToEndTest):
"""
Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
(foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
@ -98,25 +93,16 @@ class ReplicationTest(ProduceConsumeValidateTest):
indicator that nothing is left to consume.
"""
TOPIC_CONFIG = {
"partitions": 3,
"replication-factor": 3,
"configs": {"min.insync.replicas": 2}
}
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(ReplicationTest, self).__init__(test_context=test_context)
self.topic = "test_topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk,
topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
'configs': {"min.insync.replicas": 2}}
})
self.producer_throughput = 1000
self.num_producers = 1
self.num_consumers = 1
def setUp(self):
self.zk.start()
super(ReplicationTest, self).__init__(test_context=test_context, topic_config=self.TOPIC_CONFIG)
def min_cluster_size(self):
"""Override this since we're adding services outside of the constructor"""
return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
@ -156,15 +142,23 @@ class ReplicationTest(ProduceConsumeValidateTest):
- Validate that every acked message was consumed
"""
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
self.kafka.client_sasl_mechanism = client_sasl_mechanism
self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
self.enable_idempotence = enable_idempotence
compression_types = None if not compression_type else [compression_type] * self.num_producers
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic,
throughput=self.producer_throughput, compression_types=compression_types,
enable_idempotence=enable_idempotence)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=60000, message_validator=is_int)
self.create_zookeeper()
self.zk.start()
self.create_kafka(num_nodes=3,
security_protocol=security_protocol,
interbroker_security_protocol=security_protocol,
client_sasl_mechanism=client_sasl_mechanism,
interbroker_sasl_mechanism=interbroker_sasl_mechanism)
self.kafka.start()
self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))
compression_types = None if not compression_type else [compression_type]
self.create_producer(compression_types=compression_types, enable_idempotence=enable_idempotence)
self.producer.start()
self.create_consumer(log_level="DEBUG")
self.consumer.start()
self.await_startup()
failures[failure_mode](self, broker_type)
self.run_validation(enable_idempotence=enable_idempotence)

View File

@ -19,14 +19,9 @@ from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from ducktape.errors import TimeoutError
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.security.security_config import SslStores
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.tests.end_to_end import EndToEndTest
class TestSslStores(SslStores):
def __init__(self, local_scratch_dir, valid_hostname=True):
@ -41,7 +36,7 @@ class TestSslStores(SslStores):
else:
return "invalidhostname"
class SecurityTest(ProduceConsumeValidateTest):
class SecurityTest(EndToEndTest):
"""
These tests validate security features.
"""
@ -50,21 +45,6 @@ class SecurityTest(ProduceConsumeValidateTest):
""":type test_context: ducktape.tests.test.TestContext"""
super(SecurityTest, self).__init__(test_context=test_context)
self.topic = "test_topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
"partitions": 2,
"replication-factor": 1}
})
self.num_partitions = 2
self.timeout_sec = 10000
self.producer_throughput = 1000
self.num_producers = 1
self.num_consumers = 1
def setUp(self):
self.zk.start()
def producer_consumer_have_expected_error(self, error):
try:
for node in self.producer.nodes:
@ -87,16 +67,19 @@ class SecurityTest(ProduceConsumeValidateTest):
with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE.
"""
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = interbroker_security_protocol
SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir, valid_hostname=False)
SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir,
valid_hostname=False)
self.create_zookeeper()
self.zk.start()
self.create_kafka(security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol)
self.kafka.start()
self.create_producer_and_consumer()
self.producer.log_level = "TRACE"
self.producer.start()
self.consumer.start()
# We need more verbose logging to catch the expected errors
self.create_and_start_clients(log_level="DEBUG")
try:
wait_until(lambda: self.producer.num_acked > 0, timeout_sec=5)
@ -109,19 +92,17 @@ class SecurityTest(ProduceConsumeValidateTest):
error = 'SSLHandshakeException' if security_protocol == 'SSL' else 'LEADER_NOT_AVAILABLE'
wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=5)
self.producer.stop()
self.consumer.stop()
self.producer.log_level = "INFO"
SecurityConfig.ssl_stores.valid_hostname = True
for node in self.kafka.nodes:
self.kafka.restart_node(node, clean_shutdown=True)
self.kafka.restart_cluster()
self.create_and_start_clients(log_level="INFO")
self.run_validation()
self.create_producer_and_consumer()
self.run_produce_consume_validate()
def create_producer_and_consumer(self):
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=10000, message_validator=is_int)
def create_and_start_clients(self, log_level):
self.create_producer(log_level=log_level)
self.producer.start()
self.create_consumer(log_level=log_level)
self.consumer.start()

View File

@ -0,0 +1,151 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka import TopicPartition
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.utils import validate_delivery
import time
class EndToEndTest(Test):
"""This class provides a shared template for tests which follow the common pattern of:
- produce to a topic in the background
- consume from that topic in the background
- run some logic, e.g. fail topic leader etc.
- perform validation
"""
DEFAULT_TOPIC_CONFIG = {"partitions": 2, "replication-factor": 1}
def __init__(self, test_context, topic="test_topic", topic_config=DEFAULT_TOPIC_CONFIG):
super(EndToEndTest, self).__init__(test_context=test_context)
self.topic = topic
self.topic_config = topic_config
self.records_consumed = []
self.last_consumed_offsets = {}
def create_zookeeper(self, num_nodes=1, **kwargs):
self.zk = ZookeeperService(self.test_context, num_nodes=num_nodes, **kwargs)
def create_kafka(self, num_nodes=1, **kwargs):
group_metadata_config = {
"partitions": num_nodes,
"replication-factor": min(num_nodes, 3),
"configs": {"cleanup.policy": "compact"}
}
topics = {
self.topic: self.topic_config,
"__consumer_offsets": group_metadata_config
}
self.kafka = KafkaService(self.test_context, num_nodes=num_nodes,
zk=self.zk, topics=topics, **kwargs)
def create_consumer(self, num_nodes=1, group_id="test_group", **kwargs):
self.consumer = VerifiableConsumer(self.test_context,
num_nodes=num_nodes,
kafka=self.kafka,
topic=self.topic,
group_id=group_id,
on_record_consumed=self.on_record_consumed,
**kwargs)
def create_producer(self, num_nodes=1, throughput=1000, **kwargs):
self.producer = VerifiableProducer(self.test_context,
num_nodes=num_nodes,
kafka=self.kafka,
topic=self.topic,
throughput=throughput,
**kwargs)
def on_record_consumed(self, record, node):
partition = TopicPartition(record["topic"], record["partition"])
record_id = int(record["value"])
offset = record["offset"]
self.last_consumed_offsets[partition] = offset
self.records_consumed.append(record_id)
def await_consumed_offsets(self, last_acked_offsets, timeout_sec):
def has_finished_consuming():
for partition, offset in last_acked_offsets.iteritems():
if not partition in self.last_consumed_offsets:
return False
if self.last_consumed_offsets[partition] < offset:
return False
return True
wait_until(has_finished_consuming,
timeout_sec=timeout_sec,
err_msg="Consumer failed to consume up to offsets %s after waiting %ds." %\
(str(last_acked_offsets), timeout_sec))
def _collect_all_logs(self):
for s in self.test_context.services:
self.mark_for_collect(s)
def await_startup(self, min_records=5, timeout_sec=30):
try:
wait_until(lambda: self.consumer.total_consumed() >= min_records,
timeout_sec=timeout_sec,
err_msg="Timed out after %ds while awaiting initial record delivery of %d records" %\
(timeout_sec, min_records))
except BaseException:
self._collect_all_logs()
raise
def run_validation(self, min_records=5000, producer_timeout_sec=30,
consumer_timeout_sec=30, enable_idempotence=False):
try:
wait_until(lambda: self.producer.num_acked > min_records,
timeout_sec=producer_timeout_sec,
err_msg="Producer failed to produce messages for %ds." %\
producer_timeout_sec)
self.logger.info("Stopping producer after writing up to offsets %s" %\
str(self.producer.last_acked_offsets))
self.producer.stop()
self.await_consumed_offsets(self.producer.last_acked_offsets, consumer_timeout_sec)
self.consumer.stop()
self.validate(enable_idempotence)
except BaseException:
self._collect_all_logs()
raise
def validate(self, enable_idempotence):
self.logger.info("Number of acked records: %d" % len(self.producer.acked))
self.logger.info("Number of consumed records: %d" % len(self.records_consumed))
def check_lost_data(missing_records):
return self.kafka.search_data_files(self.topic, missing_records)
succeeded, error_msg = validate_delivery(self.producer.acked, self.records_consumed,
enable_idempotence, check_lost_data)
# Collect all logs if validation fails
if not succeeded:
self._collect_all_logs()
assert succeeded, error_msg

View File

@ -15,6 +15,9 @@
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.utils import validate_delivery
import time
class ProduceConsumeValidateTest(Test):
@ -112,68 +115,21 @@ class ProduceConsumeValidateTest(Test):
self.mark_for_collect(s)
raise
@staticmethod
def annotate_missing_msgs(missing, acked, consumed, msg):
missing_list = list(missing)
msg += "%s acked message did not make it to the Consumer. They are: " %\
len(missing_list)
if len(missing_list) < 20:
msg += str(missing_list) + ". "
else:
msg += ", ".join(str(m) for m in missing_list[:20])
msg += "...plus %s more. Total Acked: %s, Total Consumed: %s. " \
% (len(missing_list) - 20, len(set(acked)), len(set(consumed)))
return msg
@staticmethod
def annotate_data_lost(data_lost, msg, number_validated):
print_limit = 10
if len(data_lost) > 0:
msg += "The first %s missing messages were validated to ensure they are in Kafka's data files. " \
"%s were missing. This suggests data loss. Here are some of the messages not found in the data files: %s\n" \
% (number_validated, len(data_lost), str(data_lost[0:print_limit]) if len(data_lost) > print_limit else str(data_lost))
else:
msg += "We validated that the first %s of these missing messages correctly made it into Kafka's data files. " \
"This suggests they were lost on their way to the consumer." % number_validated
return msg
def validate(self):
"""Check that each acked message was consumed."""
success = True
msg = ""
acked = self.producer.acked
consumed = self.consumer.messages_consumed[1]
# Correctness of the set difference operation depends on using equivalent message_validators in procuder and consumer
missing = set(acked) - set(consumed)
messages_consumed = self.consumer.messages_consumed[1]
self.logger.info("num consumed: %d" % len(consumed))
self.logger.info("Number of acked records: %d" % len(self.producer.acked))
self.logger.info("Number of consumed records: %d" % len(messages_consumed))
# Were all acked messages consumed?
if len(missing) > 0:
msg = self.annotate_missing_msgs(missing, acked, consumed, msg)
success = False
def check_lost_data(missing_records):
return self.kafka.search_data_files(self.topic, missing_records)
#Did we miss anything due to data loss?
to_validate = list(missing)[0:1000 if len(missing) > 1000 else len(missing)]
data_lost = self.kafka.search_data_files(self.topic, to_validate)
msg = self.annotate_data_lost(data_lost, msg, len(to_validate))
if self.enable_idempotence:
self.logger.info("Ran a test with idempotence enabled. We expect no duplicates")
else:
self.logger.info("Ran a test with idempotence disabled.")
# Are there duplicates?
if len(set(consumed)) != len(consumed):
num_duplicates = abs(len(set(consumed)) - len(consumed))
msg += "(There are also %s duplicate messages in the log - but that is an acceptable outcome)\n" % num_duplicates
if self.enable_idempotence:
assert False, "Detected %s duplicates even though idempotence was enabled." % num_duplicates
succeeded, error_msg = validate_delivery(self.producer.acked, messages_consumed,
self.enable_idempotence, check_lost_data)
# Collect all logs if validation fails
if not success:
if not succeeded:
for s in self.test_context.services:
self.mark_for_collect(s)
assert success, msg
assert succeeded, error_msg

View File

@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable
from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable, validate_delivery

View File

@ -112,3 +112,59 @@ def node_is_reachable(src_node, dst_node):
:return: True only if dst is reachable from src.
"""
return 0 == src_node.account.ssh("nc -w 3 -z %s 22" % dst_node.account.hostname, allow_fail=True)
def annotate_missing_msgs(missing, acked, consumed, msg):
missing_list = list(missing)
msg += "%s acked message did not make it to the Consumer. They are: " %\
len(missing_list)
if len(missing_list) < 20:
msg += str(missing_list) + ". "
else:
msg += ", ".join(str(m) for m in missing_list[:20])
msg += "...plus %s more. Total Acked: %s, Total Consumed: %s. " \
% (len(missing_list) - 20, len(set(acked)), len(set(consumed)))
return msg
def annotate_data_lost(data_lost, msg, number_validated):
print_limit = 10
if len(data_lost) > 0:
msg += "The first %s missing messages were validated to ensure they are in Kafka's data files. " \
"%s were missing. This suggests data loss. Here are some of the messages not found in the data files: %s\n" \
% (number_validated, len(data_lost), str(data_lost[0:print_limit]) if len(data_lost) > print_limit else str(data_lost))
else:
msg += "We validated that the first %s of these missing messages correctly made it into Kafka's data files. " \
"This suggests they were lost on their way to the consumer." % number_validated
return msg
def validate_delivery(acked, consumed, idempotence_enabled=False, check_lost_data=None):
"""Check that each acked message was consumed."""
success = True
msg = ""
# Correctness of the set difference operation depends on using equivalent
# message_validators in producer and consumer
missing = set(acked) - set(consumed)
# Were all acked messages consumed?
if len(missing) > 0:
msg = annotate_missing_msgs(missing, acked, consumed, msg)
success = False
# Did we miss anything due to data loss?
if check_lost_data:
to_validate = list(missing)[0:1000 if len(missing) > 1000 else len(missing)]
data_lost = check_lost_data(to_validate)
msg = annotate_data_lost(data_lost, msg, len(to_validate))
# Are there duplicates?
if len(set(consumed)) != len(consumed):
num_duplicates = abs(len(set(consumed)) - len(consumed))
if idempotence_enabled:
success = False
msg += "Detected %d duplicates even though idempotence was enabled.\n" % num_duplicates
else:
msg += "(There are also %d duplicate messages in the log - but that is an acceptable outcome)\n" % num_duplicates
return success, msg