KAFKA-16963: Ducktape test for KIP-853 (#17081)

Add a ducktape system test for KIP-853 quorum reconfiguration, including adding and removing voters.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Alyssa Huang 2024-09-06 13:44:09 -07:00 committed by GitHub
parent 62379d7d53
commit a9a4a52c9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 301 additions and 22 deletions

View File

@ -384,7 +384,7 @@ public class LeaderState<T> implements EpochState {
}
public boolean isReplicaCaughtUp(ReplicaKey replicaKey, long currentTimeMs) {
// In summary, let's consider a replica caughed up for add voter, if they
// In summary, let's consider a replica caught up for add voter, if they
// have fetched within the last hour
long anHourInMs = TimeUnit.HOURS.toMillis(1);
return Optional.ofNullable(observerStates.get(replicaKey))

View File

@ -19,12 +19,16 @@ Define Kafka configuration property names here.
BROKER_ID = "broker.id"
NODE_ID = "node.id"
PROCESS_ROLES = "process.roles"
FIRST_BROKER_PORT = 9092
FIRST_CONTROLLER_PORT = FIRST_BROKER_PORT + 500
FIRST_CONTROLLER_ID = 3001
CLUSTER_ID = "I2eXt9rvSnyhct8BYmW6-w"
PORT = "port"
ADVERTISED_HOSTNAME = "advertised.host.name"
ADVERTISED_LISTENERS = "advertised.listeners"
LISTENERS = "listeners"
CONTROLLER_LISTENER_NAMES = "controller.listener.names"
NUM_NETWORK_THREADS = "num.network.threads"
NUM_IO_THREADS = "num.io.threads"

View File

@ -204,7 +204,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
controller_num_nodes_override=0,
allow_zk_with_kraft=False,
quorum_info_provider=None,
use_new_coordinator=None
use_new_coordinator=None,
dynamicRaftQuorum=False
):
"""
:param context: test context
@ -262,7 +263,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
e.g: {1: [["config1", "true"], ["config2", "1000"]], 2: [["config1", "false"], ["config2", "0"]]}
:param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable
:param KafkaService isolated_kafka: process.roles=controller for this cluster when not None; ignored when using ZooKeeper
:param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise
:param int controller_num_nodes_override: the number of controller nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise
:param bool allow_zk_with_kraft: if True, then allow a KRaft broker or controller to also use ZooKeeper
:param quorum_info_provider: A function that takes this KafkaService as an argument and returns a ServiceQuorumInfo. If this is None, then the ServiceQuorumInfo is generated from the test context
:param use_new_coordinator: When true, use the new implementation of the group coordinator as per KIP-848. If this is None, the default existing group coordinator is used.
@ -299,6 +300,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.num_nodes_controller_role = 0
if self.quorum_info.using_kraft:
self.dynamicRaftQuorum = dynamicRaftQuorum
self.first_controller_started = False
if self.quorum_info.has_brokers:
num_nodes_broker_role = num_nodes
if self.quorum_info.has_controllers:
@ -338,7 +341,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
listener_security_config=listener_security_config,
extra_kafka_opts=extra_kafka_opts, tls_version=tls_version,
isolated_kafka=self, allow_zk_with_kraft=self.allow_zk_with_kraft,
server_prop_overrides=server_prop_overrides
server_prop_overrides=server_prop_overrides, dynamicRaftQuorum=self.dynamicRaftQuorum
)
self.controller_quorum = self.isolated_controller_quorum
@ -435,15 +438,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
kraft_broker_plus_zk_configs = kraft_broker_configs.copy()
kraft_broker_plus_zk_configs.update(zk_broker_configs)
kraft_broker_plus_zk_configs.pop(config_property.BROKER_ID)
controller_only_configs = {
config_property.NODE_ID: self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1,
}
kraft_controller_plus_zk_configs = controller_only_configs.copy()
kraft_controller_plus_zk_configs.update(zk_broker_configs)
kraft_controller_plus_zk_configs.pop(config_property.BROKER_ID)
if node_quorum_info.service_quorum_info.using_zk:
node.config = KafkaConfig(**zk_broker_configs)
elif not node_quorum_info.has_broker_role: # KRaft controller-only role
controller_only_configs = {
config_property.NODE_ID: self.node_id_as_isolated_controller(node),
}
kraft_controller_plus_zk_configs = controller_only_configs.copy()
kraft_controller_plus_zk_configs.update(zk_broker_configs)
kraft_controller_plus_zk_configs.pop(config_property.BROKER_ID)
if self.zk:
node.config = KafkaConfig(**kraft_controller_plus_zk_configs)
else:
@ -456,6 +459,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.combined_nodes_started = 0
self.nodes_to_start = self.nodes
# Does not do any validation to check if this node is part of an isolated controller quorum or not
def node_id_as_isolated_controller(self, node):
return self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1
def reconfigure_zk_for_migration(self, kraft_quorum):
self.configured_for_zk_migration = True
self.controller_quorum = kraft_quorum
@ -628,7 +635,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def alive(self, node):
return len(self.pids(node)) > 0
def start(self, add_principals="", nodes_to_skip=[], timeout_sec=60, **kwargs):
def start(self, add_principals="", nodes_to_skip=[], isolated_controllers_to_skip=[], timeout_sec=60, **kwargs):
"""
Start the Kafka broker and wait until it registers its ID in ZooKeeper
Startup will be skipped for any nodes in nodes_to_skip. These nodes can be started later via add_broker
@ -666,7 +673,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self._ensure_zk_chroot()
if self.isolated_controller_quorum:
self.isolated_controller_quorum.start()
self.isolated_controller_quorum.start(nodes_to_skip=isolated_controllers_to_skip)
Service.start(self, **kwargs)
@ -728,7 +735,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.concurrent_start = False
self.start_node(node)
self.concurrent_start = orig_concurrent_start
wait_until(lambda: self.is_registered(node), 30, 1)
def _ensure_zk_chroot(self):
self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
@ -868,15 +874,19 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.maybe_setup_broker_scram_credentials(node)
if self.quorum_info.using_kraft:
# define controller.quorum.voters text
# define controller.quorum.bootstrap.servrers or controller.quorum.voters text
security_protocol_to_use = self.controller_quorum.controller_security_protocol
first_node_id = 1 if self.quorum_info.has_brokers_and_controllers else config_property.FIRST_CONTROLLER_ID
self.controller_quorum_voters = ','.join(["%s@%s:%s" %
(self.controller_quorum.idx(node) + first_node_id - 1,
node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
controller_quorum_bootstrap_servers = ','.join(["%s:%s" % (node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
if self.dynamicRaftQuorum:
self.controller_quorum_bootstrap_servers = controller_quorum_bootstrap_servers
else:
self.controller_quorum_voters = ','.join(["%s@%s" % (self.controller_quorum.idx(node) + first_node_id - 1,
bootstrap_server)
for bootstrap_server in controller_quorum_bootstrap_servers])
# define controller.listener.names
self.controller_listener_names = ','.join(self.controller_listener_name_list(node))
# define sasl.mechanism.controller.protocol to match the isolated quorum if one exists
@ -893,8 +903,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
# format log directories if necessary
kafka_storage_script = self.path.script("kafka-storage.sh", node)
cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % (kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
if self.dynamicRaftQuorum:
cmd += " --feature kraft.version=1"
if not self.first_controller_started and self.node_quorum_info.has_controller_role:
cmd += " --standalone"
self.logger.info("Running log directory format command...\n%s" % cmd)
node.account.ssh(cmd)
self.first_controller_started = True
cmd = self.start_cmd(node)
self.logger.debug("Attempting to start KafkaService %s on %s with command: %s" %\
@ -1010,9 +1025,26 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
JmxMixin.clean_node(self, node)
self.security_config.clean_node(node)
node.account.kill_process(self.java_class_name(),
clean_shutdown=False, allow_fail=True)
clean_shutdown=False, allow_fail=True)
node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False)
def kafka_metadata_quorum_cmd(self, node, kafka_security_protocol=None, use_controller_bootstrap=False):
if kafka_security_protocol is None:
# it wasn't specified, so use the inter-broker/controller security protocol if it is PLAINTEXT,
# otherwise use the client security protocol
if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
security_protocol_to_use = SecurityConfig.PLAINTEXT
else:
security_protocol_to_use = self.security_protocol
else:
security_protocol_to_use = kafka_security_protocol
if use_controller_bootstrap:
bootstrap = "--bootstrap-controller %s" % (self.bootstrap_controllers("CONTROLLER_%s" % security_protocol_to_use))
else:
bootstrap = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use))
kafka_metadata_script = self.path.script("kafka-metadata-quorum.sh", node)
return "%s %s" % (kafka_metadata_script, bootstrap)
def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]):
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-topics against a broker, not a KRaft controller")
@ -1760,6 +1792,62 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.debug(output)
return output
def describe_quorum(self, node=None):
"""Run the describe quorum command.
Specifying node is optional, if not specified the command will be run from self.nodes[0]
"""
if node is None:
node = self.nodes[0]
cmd = fix_opts_for_new_jvm(node)
cmd += "%(kafka_metadata_quorum_cmd)s describe --status" % {
'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node)
}
self.logger.info("Running describe quorum command...\n%s" % cmd)
node.account.ssh(cmd)
output = ""
for line in node.account.ssh_capture(cmd):
output += line
return output
def add_controller(self, controllerId, controller):
"""Run the metadata quorum add controller command. This should be run on the node that is being added.
"""
command_config_path = os.path.join(KafkaService.PERSISTENT_ROOT, "controller_command_config.properties")
configs = f"""
{config_property.NODE_ID}={controllerId}
{config_property.PROCESS_ROLES}=controller
{config_property.METADATA_LOG_DIR}={KafkaService.METADATA_LOG_DIR}
{config_property.ADVERTISED_LISTENERS}={self.advertised_listeners}
{config_property.LISTENERS}={self.listeners}
{config_property.CONTROLLER_LISTENER_NAMES}={self.controller_listener_names}"""
controller.account.create_file(command_config_path, configs)
cmd = fix_opts_for_new_jvm(controller)
cmd += "%(kafka_metadata_quorum_cmd)s --command-config %(command_config)s add-controller" % {
'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(controller, use_controller_bootstrap=True),
'command_config': command_config_path
}
self.logger.info("Running add controller command...\n%s" % cmd)
controller.account.ssh(cmd)
def remove_controller(self, controllerId, directoryId, node=None):
"""Run the admin tool remove controller command.
Specifying node is optional, if not specified the command will be run from self.nodes[0]
"""
if node is None:
node = self.nodes[0]
cmd = fix_opts_for_new_jvm(node)
cmd += "%(kafka_metadata_quorum_cmd)s remove-controller -i %(controller_id)s -d %(directory_id)s" % {
'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node, use_controller_bootstrap=True),
'controller_id': controllerId,
'directory_id': directoryId
}
self.logger.info("Running remove controller command...\n%s" % cmd)
node.account.ssh(cmd)
def zk_connect_setting(self):
if self.quorum_info.using_kraft and not self.zk:
raise Exception("No zookeeper connect string available with KRaft unless ZooKeeper is explicitly enabled")
@ -1774,6 +1862,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
for node in self.nodes
if node not in offline_nodes])
def __bootstrap_controllers(self, port, validate=True, offline_nodes=[]):
if validate and not port.open:
raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " %
str(port.port_number))
return ','.join([node.account.hostname + ":" + str(port.port_number)
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]
if node not in offline_nodes])
def bootstrap_servers(self, protocol='PLAINTEXT', validate=True, offline_nodes=[]):
"""Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
@ -1783,6 +1880,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.info("Bootstrap client port is: " + str(port_mapping.port_number))
return self.__bootstrap_servers(port_mapping, validate, offline_nodes)
def bootstrap_controllers(self, protocol='CONTROLLER_PLAINTEXT', validate=True, offline_nodes=[]):
"""Return comma-delimited list of controllers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
This is the format expected by many config files.
"""
port_mapping = self.port_mappings[protocol]
self.logger.info("Bootstrap client port is: " + str(port_mapping.port_number))
return self.__bootstrap_controllers(port_mapping, validate, offline_nodes)
def controller(self):
""" Get the controller node
"""

View File

@ -23,8 +23,12 @@ process.roles=controller
{% else %}
process.roles=broker
{% endif %}
# The connect string for the controller quorum
# The connect string for the controller quorum. Only one should be defined
{% if controller_quorum_bootstrap_servers %}
controller.quorum.bootstrap.servers={{ controller_quorum_bootstrap_servers }}
{% else %}
controller.quorum.voters={{ controller_quorum_voters }}
{% endif %}
controller.listener.names={{ controller_listener_names }}

View File

@ -0,0 +1,165 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import re
from functools import partial
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka.quorum import combined_kraft, ServiceQuorumInfo, isolated_kraft
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import DEV_BRANCH
#
# Test quorum reconfiguration for combined and isolated mode
#
class TestQuorumReconfiguration(ProduceConsumeValidateTest):
def __init__(self, test_context):
super(TestQuorumReconfiguration, self).__init__(test_context=test_context)
def setUp(self):
self.topic = "test_topic"
self.partitions = 3
self.replication_factor = 3
# Producer and consumer
self.producer_throughput = 1000
self.num_producers = 1
self.num_consumers = 1
def perform_reconfig(self, active_controller_id, inactive_controller_id, inactive_controller, broker_ids):
# Check describe quorum output shows the controller (first node) is the leader and the only voter
output = self.kafka.describe_quorum()
assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
assert_nodes_in_output(r"CurrentVoters:.*", output, active_controller_id)
assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
# Start second controller
self.kafka.controller_quorum.add_broker(inactive_controller)
output = self.kafka.describe_quorum()
assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
assert_nodes_in_output(r"CurrentVoters:.*", output, active_controller_id)
assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids + [inactive_controller_id])
# Add controller to quorum
self.kafka.controller_quorum.add_controller(inactive_controller_id, inactive_controller)
# Check describe quorum output shows both controllers are voters
output = self.kafka.describe_quorum()
assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
assert_nodes_in_output(r"CurrentVoters:.*", output, active_controller_id, inactive_controller_id)
assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
# Remove leader from quorum
voters = json_from_line(r"CurrentVoters:.*", output)
directory_id = next(voter["directoryId"] for voter in voters if voter["id"] == active_controller_id)
self.kafka.controller_quorum.remove_controller(active_controller_id, directory_id)
# Check describe quorum output to show second_controller is now the leader
output = self.kafka.describe_quorum()
assert re.search(r"LeaderId:\s*" + str(inactive_controller_id), output)
assert_nodes_in_output(r"CurrentVoters:.*", output, inactive_controller_id)
assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
@cluster(num_nodes=6)
@matrix(metadata_quorum=[combined_kraft])
def test_combined_mode_reconfig(self, metadata_quorum):
self.kafka = KafkaService(self.test_context,
num_nodes=4,
zk=None,
topics={self.topic: {"partitions": self.partitions,
"replication-factor": self.replication_factor,
'configs': {"min.insync.replicas": 1}}},
version=DEV_BRANCH,
controller_num_nodes_override=2,
dynamicRaftQuorum=True)
# Start one out of two controllers (standalone mode)
inactive_controller = self.kafka.nodes[1]
self.kafka.start(nodes_to_skip=[inactive_controller])
# Start producer and consumer
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
self.topic, throughput=self.producer_throughput,
message_validator=is_int, compression_types=["none"],
version=DEV_BRANCH, offline_nodes=[inactive_controller])
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic, new_consumer=True, consumer_timeout_ms=30000,
message_validator=is_int, version=DEV_BRANCH)
# Perform reconfigurations
self.run_produce_consume_validate(
core_test_action=lambda: self.perform_reconfig(self.kafka.idx(self.kafka.nodes[0]),
self.kafka.idx(inactive_controller),
inactive_controller,
[self.kafka.idx(node) for node in self.kafka.nodes[2:]]))
@cluster(num_nodes=7)
@matrix(metadata_quorum=[isolated_kraft])
def test_isolated_mode_reconfig(self, metadata_quorum):
# Start up KRaft controller in migration mode
remote_quorum = partial(ServiceQuorumInfo, isolated_kraft)
self.kafka = KafkaService(self.test_context,
num_nodes=3,
zk=None,
topics={self.topic: {"partitions": self.partitions,
"replication-factor": self.replication_factor,
'configs': {"min.insync.replicas": 1}}},
version=DEV_BRANCH,
controller_num_nodes_override=2,
quorum_info_provider=remote_quorum,
dynamicRaftQuorum=True)
# Start one out of two controllers (standalone mode)
controller_quorum = self.kafka.controller_quorum
inactive_controller = controller_quorum.nodes[1]
self.kafka.start(isolated_controllers_to_skip=[inactive_controller])
# Start producer and consumer
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
self.topic, throughput=self.producer_throughput,
message_validator=is_int, compression_types=["none"],
version=DEV_BRANCH)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic, new_consumer=True, consumer_timeout_ms=30000,
message_validator=is_int, version=DEV_BRANCH)
# Perform reconfigurations
self.run_produce_consume_validate(
core_test_action=lambda: self.perform_reconfig(controller_quorum.node_id_as_isolated_controller(self.kafka.controller_quorum.nodes[0]),
controller_quorum.node_id_as_isolated_controller(inactive_controller),
inactive_controller,
[self.kafka.idx(node) for node in self.kafka.nodes]))
def assert_nodes_in_output(pattern, output, *node_ids):
nodes = json_from_line(pattern, output)
assert len(nodes) == len(node_ids)
for node in nodes:
assert node["id"] in node_ids
def json_from_line(pattern, output):
match = re.search(pattern, output)
if not match:
raise Exception("Expected match for pattern %s in describe quorum output" % pattern)
line = match.group(0)
start_index = line.find('[')
end_index = line.rfind(']') + 1
return json.loads(line[start_index:end_index])