From 4f39b5bc5b2104fb39ab5b0c087fe84a71205a74 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 23 Jan 2016 16:59:28 -0800 Subject: [PATCH] KAFKA-2846: Add Ducktape test for kafka-consumer-groups Author: Ashish Singh Reviewers: Geoff Anderson , Ewen Cheslack-Postava Closes #555 from SinghAsDev/KAFKA-2846 --- tests/kafkatest/services/kafka/kafka.py | 50 +++++++++ .../tests/consumer_group_command_test.py | 105 ++++++++++++++++++ 2 files changed, 155 insertions(+) create mode 100644 tests/kafkatest/tests/consumer_group_command_test.py diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index cb5018caf5c..b9105df544d 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -413,6 +413,56 @@ class KafkaService(JmxMixin, Service): self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx)) return self.get_node(leader_idx) + def list_consumer_groups(self, node=None, new_consumer=False, command_config=None): + """ Get list of consumer groups. + """ + if node is None: + node = self.nodes[0] + + if command_config is None: + command_config = "" + else: + command_config = "--command-config " + command_config + + if new_consumer: + cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --list" % \ + (kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config) + else: + cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --list" % \ + (kafka_dir(node), self.zk.connect_setting(), command_config) + output = "" + self.logger.debug(cmd) + for line in node.account.ssh_capture(cmd): + if not line.startswith("SLF4J"): + output += line + self.logger.debug(output) + return output + + def describe_consumer_group(self, group, node=None, new_consumer=False, command_config=None): + """ Describe a consumer group. + """ + if node is None: + node = self.nodes[0] + + if command_config is None: + command_config = "" + else: + command_config = "--command-config " + command_config + + if new_consumer: + cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --group %s --describe" % \ + (kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config, group) + else: + cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --group %s --describe" % \ + (kafka_dir(node), self.zk.connect_setting(), command_config, group) + output = "" + self.logger.debug(cmd) + for line in node.account.ssh_capture(cmd): + if not (line.startswith("SLF4J") or line.startswith("GROUP, TOPIC") or line.startswith("Could not fetch offset")): + output += line + self.logger.debug(output) + return output + def bootstrap_servers(self, protocol='PLAINTEXT'): """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,... diff --git a/tests/kafkatest/tests/consumer_group_command_test.py b/tests/kafkatest/tests/consumer_group_command_test.py new file mode 100644 index 00000000000..a7b43a15b9d --- /dev/null +++ b/tests/kafkatest/tests/consumer_group_command_test.py @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.utils.util import wait_until +from ducktape.tests.test import Test +from ducktape.mark import matrix + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.security.security_config import SecurityConfig + +import os + +TOPIC = "topic-consumer-group-command" + +class ConsumerGroupCommandTest(Test): + """ + Tests ConsumerGroupCommand + """ + # Root directory for persistent output + PERSISTENT_ROOT = "/mnt/consumer_group_command" + COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties") + + def __init__(self, test_context): + super(ConsumerGroupCommandTest, self).__init__(test_context) + self.num_zk = 1 + self.num_brokers = 1 + self.topics = { + TOPIC: {'partitions': 1, 'replication-factor': 1} + } + self.zk = ZookeeperService(test_context, self.num_zk) + + def setUp(self): + self.zk.start() + + def start_kafka(self, security_protocol, interbroker_security_protocol): + self.kafka = KafkaService( + self.test_context, self.num_brokers, + self.zk, security_protocol=security_protocol, + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics) + self.kafka.start() + + def start_consumer(self, security_protocol): + enable_new_consumer = security_protocol == SecurityConfig.SSL + self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, + consumer_timeout_ms=None, new_consumer=enable_new_consumer) + self.consumer.start() + + def setup_and_verify(self, security_protocol, group=None): + self.start_kafka(security_protocol, security_protocol) + self.start_consumer(security_protocol) + consumer_node = self.consumer.nodes[0] + wait_until(lambda: self.consumer.alive(consumer_node), + timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") + kafka_node = self.kafka.nodes[0] + if security_protocol is not SecurityConfig.PLAINTEXT: + prop_file = str(self.kafka.security_config.client_config()) + self.logger.debug(prop_file) + kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file) + + # Verify ConsumerGroupCommand lists expected consumer groups + enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT + command_config_file = None + if enable_new_consumer: + command_config_file = self.COMMAND_CONFIG_FILE + + if group: + wait_until(lambda: ("%s, topic-consumer-group-command, 0," % group) in self.kafka.describe_consumer_group(group=group, node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10, + err_msg="Timed out waiting to list expected consumer groups.") + else: + wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10, + err_msg="Timed out waiting to list expected consumer groups.") + + self.consumer.stop() + + @matrix(security_protocol=['PLAINTEXT', 'SSL']) + def test_list_consumer_groups(self, security_protocol='PLAINTEXT'): + """ + Tests if ConsumerGroupCommand is listing correct consumer groups + :return: None + """ + self.setup_and_verify(security_protocol) + + @matrix(security_protocol=['PLAINTEXT', 'SSL']) + def test_describe_consumer_group(self, security_protocol='PLAINTEXT'): + """ + Tests if ConsumerGroupCommand is describing a consumer group correctly + :return: None + """ + self.setup_and_verify(security_protocol, group="test-consumer-group")