MINOR: Support Raft-based metadata quorums in system tests (#10093)

We need to be able to run system tests with Raft-based metadata quorums -- both
co-located brokers and controllers as well as remote controllers -- in addition to the
ZooKepeer-based mode we run today. This PR adds this capability to KafkaService in a
backwards-compatible manner as follows.

If no changes are made to existing system tests then they function as they always do --
they instantiate ZooKeeper, and Kafka will use ZooKeeper. On the other hand, if we want
to use a Raft-based metadata quorum we can do so by introducing a metadata_quorum
argument to the test method and using @matrix to set it to the quorums we want to use for
the various runs of the test. We then also have to skip creating a ZooKeeperService when
the quorum is Raft-based.

This PR does not update any tests -- those will come later after all the KIP-500 code is
merged.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Ron Dagostino 2021-02-11 12:44:17 -05:00 committed by GitHub
parent bf5e1f1cc0
commit faaef2c2df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 930 additions and 64 deletions

View File

@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The role of this server. Setting this puts us in kip-500 mode
process.roles=broker
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://localhost:9092
inter.broker.listener.name=PLAINTEXT
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://localhost:9092
# Listener, host name, and port for the controller to advertise to the brokers. If
# this server is a controller, this listener must be configured.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/raft-broker-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

View File

@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The role of this server. Setting this puts us in kip-500 mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://localhost:9092
# Listener, host name, and port for the controller to advertise to the brokers. If
# this server is a controller, this listener must be configured.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/raft-combined-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

View File

@ -0,0 +1,124 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The role of this server. Setting this puts us in kip-500 mode
process.roles=controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9093
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Listener, host name, and port for the controller to advertise to the brokers. If
# this server is a controller, this listener must be configured.
controller.listener.names=PLAINTEXT
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/raft-controller-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

View File

@ -22,11 +22,8 @@ class KafkaConfig(dict):
"""
DEFAULTS = {
config_property.PORT: 9092,
config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
config_property.LOG_DIRS: "/mnt/kafka/kafka-data-logs-1,/mnt/kafka/kafka-data-logs-2",
config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 18000,
config_property.ZOOKEEPER_SESSION_TIMEOUT_MS: 18000
}
def __init__(self, **kwargs):

View File

@ -18,6 +18,11 @@ Define Kafka configuration property names here.
"""
BROKER_ID = "broker.id"
NODE_ID = "node.id"
FIRST_BROKER_PORT = 9092
FIRST_CONTROLLER_PORT = FIRST_BROKER_PORT + 500
FIRST_CONTROLLER_ID = 3001
CLUSTER_ID = "6bd37820-6745-4790-ae98-620300e1f61b"
PORT = "port"
ADVERTISED_HOSTNAME = "advertised.host.name"

View File

@ -24,9 +24,8 @@ from ducktape.utils.util import wait_until
from ducktape.cluster.remoteaccount import RemoteCommandError
from .config import KafkaConfig
from kafkatest.version import KafkaVersion
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import config_property
from kafkatest.services.kafka import config_property, quorum
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.security.minikdc import MiniKdc
from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
@ -53,6 +52,94 @@ class KafkaListener:
return "%s:%s" % (self.name, self.security_protocol)
class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
"""
Ducktape system test service for Brokers and Raft-based Controllers
Metadata Quorums
----------------
Kafka can use either ZooKeeper or a Raft Controller quorum for its
metadata. See the kafkatest.services.kafka.quorum.ServiceQuorumInfo
class for details.
Attributes
----------
quorum_info : kafkatest.services.kafka.quorum.ServiceQuorumInfo
Information about the service and it's metadata quorum
num_nodes_broker_role : int
The number of nodes in the service that include 'broker'
in process.roles (0 when using Zookeeper)
num_nodes_controller_role : int
The number of nodes in the service that include 'controller'
in process.roles (0 when using Zookeeper)
controller_quorum : KafkaService
None when using ZooKeeper, otherwise the Kafka service for the
co-located case or the remote controller quorum service
instance for the remote case
remote_controller_quorum : KafkaService
None for the co-located case or when using ZooKeeper, otherwise
the remote controller quorum service instance
Kafka Security Protocols
------------------------
The security protocol advertised to clients and the inter-broker
security protocol can be set in the constructor and can be changed
afterwards as well. Set these attributes to make changes; they
take effect when starting each node:
security_protocol : str
default PLAINTEXT
client_sasl_mechanism : str
default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
interbroker_security_protocol : str
default PLAINTEXT
interbroker_sasl_mechanism : str
default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
ZooKeeper
---------
Create an instance of ZookeeperService when metadata_quorum is ZK
(ZK is the default if metadata_quorum is not a test parameter).
Raft Quorums
------------
Set metadata_quorum accordingly (to COLOCATED_RAFT or REMOTE_RAFT).
Do not instantiate a ZookeeperService instance.
Starting Kafka will cause any remote controller quorum to
automatically start first. Explicitly stopping Kafka does not stop
any remote controller quorum, but Ducktape will stop both when
tearing down the test (it will stop Kafka first).
Raft Security Protocols
--------------------------------
The broker-to-controller and inter-controller security protocols
will both initially be set to the inter-broker security protocol.
The broker-to-controller and inter-controller security protocols
must be identical for the co-located case (an exception will be
thrown when trying to start the service if they are not identical).
The broker-to-controller and inter-controller security protocols
can differ in the remote case.
Set these attributes for the co-located case. Changes take effect
when starting each node:
controller_security_protocol : str
default PLAINTEXT
controller_sasl_mechanism : str
default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
intercontroller_security_protocol : str
default PLAINTEXT
intercontroller_sasl_mechanism : str
default GSSAPI, ignored unless using SASL_PLAINTEXT or SASL_SSL
Set the same attributes for the remote case (changes take effect
when starting each quorum node), but you must first obtain the
service instance for the remote quorum via one of the
'controller_quorum' or 'remote_controller_quorum' attributes as
defined above.
"""
PERSISTENT_ROOT = "/mnt/kafka"
STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "kafka-log4j.properties")
@ -74,6 +161,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"
ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/admin_client_as_broker_jaas.conf"
KRB5_CONF = "java.security.krb5.conf=/mnt/security/krb5.conf"
SECURITY_PROTOCOLS = [SecurityConfig.PLAINTEXT, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL]
logs = {
"kafka_server_start_stdout_stderr": {
@ -103,14 +191,45 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
jmx_attributes=None, zk_connect_timeout=18000, zk_session_timeout=18000, server_prop_overides=None, zk_chroot=None,
zk_client_secure=False,
listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None,
extra_kafka_opts="", tls_version=None):
extra_kafka_opts="", tls_version=None,
remote_kafka=None,
controller_num_nodes_override=0,
):
"""
:param context: test context
:param int num_nodes: the number of nodes in the service. There are 4 possibilities:
1) Zookeeper quorum:
The number of brokers is defined by this parameter.
2) Co-located Raft quorum:
The number of nodes having a broker role is defined by this parameter.
The number of nodes having a controller role will by default be 1, 3, or 5 depending on num_nodes
(1 if num_nodes < 3, otherwise 3 if num_nodes < 5, otherwise 5). This calculation
can be overridden via controller_num_nodes_override, which must be between 1 and num_nodes,
inclusive, when non-zero. Here are some possibilities:
num_nodes = 1:
node 0: broker.roles=broker+controller
num_nodes = 2:
node 0: broker.roles=broker+controller
node 1: broker.roles=broker
num_nodes = 3:
node 0: broker.roles=broker+controller
node 1: broker.roles=broker+controller
node 2: broker.roles=broker+controller
num_nodes = 3, controller_num_nodes_override = 1
node 0: broker.roles=broker+controller
node 1: broker.roles=broker
node 2: broker.roles=broker
3) Remote Raft quorum when instantiating the broker service:
The number of nodes, all of which will have broker.roles=broker, is defined by this parameter.
4) Remote Raft quorum when instantiating the controller service:
The number of nodes, all of which will have broker.roles=controller, is defined by this parameter.
The value passed in is determined by the broker service when that is instantiated, and it uses the
same algorithm as described above: 1, 3, or 5 unless controller_num_nodes_override is provided.
:param ZookeeperService zk:
:param dict topics: which topics to create automatically
:param str security_protocol: security protocol for clients to use
:param str tls_version: version of the TLS protocol.
:param str interbroker_security_protocol: security protocol to use for broker-to-broker communication
:param str interbroker_security_protocol: security protocol to use for broker-to-broker (and Raft controller-to-controller) communication
:param str client_sasl_mechanism: sasl mechanism for clients to use
:param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication
:param str authorizer_class_name: which authorizer class to use
@ -120,18 +239,75 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
:param int zk_connect_timeout:
:param int zk_session_timeout:
:param dict server_prop_overides: overrides for kafka.properties file
:param zk_chroot:
:param str zk_chroot:
:param bool zk_client_secure: connect to Zookeeper over secure client port (TLS) when True
:param ListenerSecurityConfig listener_security_config: listener config to use
:param dict per_node_server_prop_overrides:
:param dict per_node_server_prop_overrides: overrides for kafka.properties file keyed by 0-based node number
:param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable
:param str tls_version: TLS version to use
:param KafkaService remote_kafka: process.roles=controller for this cluster when not None; ignored when using ZooKeeper
:param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and remote_kafka is not None; ignored otherwise
"""
self.zk = zk
self.remote_kafka = remote_kafka
self.quorum_info = quorum.ServiceQuorumInfo(self, context)
self.controller_quorum = None # will define below if necessary
self.remote_controller_quorum = None # will define below if necessary
if num_nodes < 1:
raise Exception("Must set a positive number of nodes: %i" % num_nodes)
self.num_nodes_broker_role = 0
self.num_nodes_controller_role = 0
if self.quorum_info.using_raft:
if self.quorum_info.has_brokers:
num_nodes_broker_role = num_nodes
if self.quorum_info.has_controllers:
self.num_nodes_controller_role = self.num_raft_controllers(num_nodes_broker_role, controller_num_nodes_override)
if self.remote_kafka:
raise Exception("Must not specify remote Kafka service with co-located Controller quorum")
else:
self.num_nodes_controller_role = num_nodes
if not self.remote_kafka:
raise Exception("Must specify remote Kafka service when instantiating remote Controller service (should not happen)")
# Initially use the inter-broker security protocol for both
# broker-to-controller and inter-controller communication. Both can be explicitly changed later if desired.
# Note, however, that the two must the same if the controller quorum is co-located with the
# brokers. Different security protocols for the two are only supported with a remote controller quorum.
self.controller_security_protocol = interbroker_security_protocol
self.controller_sasl_mechanism = interbroker_sasl_mechanism
self.intercontroller_security_protocol = interbroker_security_protocol
self.intercontroller_sasl_mechanism = interbroker_sasl_mechanism
# Ducktape tears down services in the reverse order in which they are created,
# so create a service for the remote controller quorum (if we need one) first, before
# invoking Service.__init__(), so that Ducktape will tear down the quorum last; otherwise
# Ducktape will tear down the controller quorum first, which could lead to problems in
# Kafka and delays in tearing it down (and who knows what else -- it's simply better
# to correctly tear down Kafka first, before tearing down the remote controller).
if self.quorum_info.has_controllers:
self.controller_quorum = self
else:
num_remote_controller_nodes = self.num_raft_controllers(num_nodes, controller_num_nodes_override)
self.remote_controller_quorum = KafkaService(
context, num_remote_controller_nodes, None, security_protocol=self.controller_security_protocol,
interbroker_security_protocol=self.intercontroller_security_protocol,
client_sasl_mechanism=self.controller_sasl_mechanism, interbroker_sasl_mechanism=self.intercontroller_sasl_mechanism,
authorizer_class_name=authorizer_class_name, version=version, jmx_object_names=jmx_object_names,
jmx_attributes=jmx_attributes,
listener_security_config=listener_security_config,
extra_kafka_opts=extra_kafka_opts, tls_version=tls_version,
remote_kafka=self,
)
self.controller_quorum = self.remote_controller_quorum
Service.__init__(self, context, num_nodes)
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=KafkaService.PERSISTENT_ROOT)
self.zk = zk
self.security_protocol = security_protocol
self.tls_version = tls_version
self.client_sasl_mechanism = client_sasl_mechanism
@ -171,35 +347,79 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
# e.g. brokers to deregister after a hard kill.
self.zk_session_timeout = zk_session_timeout
self.port_mappings = {
'PLAINTEXT': KafkaListener('PLAINTEXT', 9092, 'PLAINTEXT', False),
'SSL': KafkaListener('SSL', 9093, 'SSL', False),
'SASL_PLAINTEXT': KafkaListener('SASL_PLAINTEXT', 9094, 'SASL_PLAINTEXT', False),
'SASL_SSL': KafkaListener('SASL_SSL', 9095, 'SASL_SSL', False),
broker_only_port_mappings = {
KafkaService.INTERBROKER_LISTENER_NAME:
KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME, 9099, None, False)
KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME, config_property.FIRST_BROKER_PORT + 7, None, False)
}
controller_only_port_mappings = {}
for idx, sec_protocol in enumerate(KafkaService.SECURITY_PROTOCOLS):
name_for_controller = self.controller_listener_name(sec_protocol)
broker_only_port_mappings[sec_protocol] = KafkaListener(sec_protocol, config_property.FIRST_BROKER_PORT + idx, sec_protocol, False)
controller_only_port_mappings[name_for_controller] = KafkaListener(name_for_controller, config_property.FIRST_CONTROLLER_PORT + idx, sec_protocol, False)
if self.quorum_info.using_zk or self.quorum_info.has_brokers and not self.quorum_info.has_controllers: # ZK or Raft broker-only
self.port_mappings = broker_only_port_mappings
elif self.quorum_info.has_brokers_and_controllers: # Raft broker+controller
self.port_mappings = broker_only_port_mappings.copy()
self.port_mappings.update(controller_only_port_mappings)
else: # Raft controller-only
self.port_mappings = controller_only_port_mappings
self.interbroker_listener = None
self.setup_interbroker_listener(interbroker_security_protocol, self.listener_security_config.use_separate_interbroker_listener)
if self.quorum_info.using_zk or self.quorum_info.has_brokers:
self.setup_interbroker_listener(interbroker_security_protocol, self.listener_security_config.use_separate_interbroker_listener)
self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
self._security_config = None
for node in self.nodes:
node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
node.version = version
node.config = KafkaConfig(**{
raft_broker_configs = {
config_property.PORT: config_property.FIRST_BROKER_PORT,
config_property.NODE_ID: self.idx(node),
}
zk_broker_configs = {
config_property.PORT: config_property.FIRST_BROKER_PORT,
config_property.BROKER_ID: self.idx(node),
config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: zk_connect_timeout,
config_property.ZOOKEEPER_SESSION_TIMEOUT_MS: zk_session_timeout
})
}
controller_only_configs = {
config_property.NODE_ID: self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1,
}
if node_quorum_info.service_quorum_info.using_zk:
node.config = KafkaConfig(**zk_broker_configs)
elif not node_quorum_info.has_broker_role: # Raft controller-only role
node.config = KafkaConfig(**controller_only_configs)
else: # Raft broker-only role or combined broker+controller roles
node.config = KafkaConfig(**raft_broker_configs)
def num_raft_controllers(self, num_nodes_broker_role, controller_num_nodes_override):
if controller_num_nodes_override < 0:
raise Exception("controller_num_nodes_override must not be negative: %i" % controller_num_nodes_override)
if controller_num_nodes_override > num_nodes_broker_role and self.quorum_info.quorum_type == quorum.colocated_raft:
raise Exception("controller_num_nodes_override must not exceed the service's node count in the co-located case: %i > %i" %
(controller_num_nodes_override, num_nodes_broker_role))
if controller_num_nodes_override:
return controller_num_nodes_override
if num_nodes_broker_role < 3:
return 1
if num_nodes_broker_role < 5:
return 3
return 5
def set_version(self, version):
for node in self.nodes:
node.version = version
def controller_listener_name(self, security_protocol_name):
return "CONTROLLER_%s" % security_protocol_name
@property
def interbroker_security_protocol(self):
return self.interbroker_listener.security_protocol
# TODO: disentangle interbroker and intercontroller protocol information
return self.interbroker_listener.security_protocol if self.quorum_info.using_zk or self.quorum_info.has_brokers else self.intercontroller_security_protocol
# this is required for backwards compatibility - there are a lot of tests that set this property explicitly
# meaning 'use one of the existing listeners that match given security protocol, do not use custom listener'
@ -222,21 +442,24 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
@property
def security_config(self):
if not self._security_config:
self._security_config = SecurityConfig(self.context, self.security_protocol, self.interbroker_listener.security_protocol,
zk_sasl=self.zk.zk_sasl, zk_tls=self.zk_client_secure,
client_sasl_mechanism=self.client_sasl_mechanism,
interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
listener_security_config=self.listener_security_config,
tls_version=self.tls_version)
client_sasl_mechanism_to_use = self.client_sasl_mechanism if self.quorum_info.using_zk or self.quorum_info.has_brokers else self.controller_sasl_mechanism
interbroker_sasl_mechanism_to_use = self.interbroker_sasl_mechanism if self.quorum_info.using_zk or self.quorum_info.has_brokers else self.intercontroller_sasl_mechanism
self._security_config = SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol,
zk_sasl=self.zk.zk_sasl if self.quorum_info.using_zk else False, zk_tls=self.zk_client_secure,
client_sasl_mechanism=client_sasl_mechanism_to_use,
interbroker_sasl_mechanism=interbroker_sasl_mechanism_to_use,
listener_security_config=self.listener_security_config,
tls_version=self.tls_version)
for port in self.port_mappings.values():
if port.open:
self._security_config.enable_security_protocol(port.security_protocol)
if self.zk.zk_sasl:
self._security_config.enable_sasl()
self._security_config.zk_sasl = self.zk.zk_sasl
if self.zk_client_secure:
self._security_config.enable_ssl()
self._security_config.zk_tls = self.zk_client_secure
if self.quorum_info.using_zk:
if self.zk.zk_sasl:
self._security_config.enable_sasl()
self._security_config.zk_sasl = self.zk.zk_sasl
if self.zk_client_secure:
self._security_config.enable_ssl()
self._security_config.zk_tls = self.zk_client_secure
return self._security_config
def open_port(self, listener_name):
@ -246,35 +469,62 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.port_mappings[listener_name].open = False
def start_minikdc_if_necessary(self, add_principals=""):
if self.security_config.has_sasl:
has_sasl = self.security_config.has_sasl if self.quorum_info.using_zk else \
self.security_config.has_sasl or self.controller_quorum.security_config.has_sasl if self.quorum_info.has_brokers else \
self.security_config.has_sasl or self.remote_kafka.security_config.has_sasl
if has_sasl:
if self.minikdc is None:
self.minikdc = MiniKdc(self.context, self.nodes, extra_principals = add_principals)
self.minikdc.start()
other_service = self.remote_kafka if self.remote_kafka else self.controller_quorum if self.quorum_info.using_raft else None
if not other_service or not other_service.minikdc:
nodes_for_kdc = self.nodes.copy()
if other_service and other_service != self:
nodes_for_kdc += other_service.nodes
self.minikdc = MiniKdc(self.context, nodes_for_kdc, extra_principals = add_principals)
self.minikdc.start()
else:
self.minikdc = None
if self.quorum_info.using_raft:
self.controller_quorum.minikdc = None
if self.remote_kafka:
self.remote_kafka.minikdc = None
def alive(self, node):
return len(self.pids(node)) > 0
def start(self, add_principals=""):
if self.zk_client_secure and not self.zk.zk_client_secure_port:
if self.quorum_info.using_zk and self.zk_client_secure and not self.zk.zk_client_secure_port:
raise Exception("Unable to start Kafka: TLS to Zookeeper requested but Zookeeper secure port not enabled")
self.open_port(self.security_protocol)
self.interbroker_listener.open = True
if self.quorum_info.has_brokers_and_controllers and (
self.controller_security_protocol != self.intercontroller_security_protocol or
self.controller_sasl_mechanism != self.intercontroller_sasl_mechanism):
raise Exception("Co-located Raft-based Brokers (%s/%s) and Controllers (%s/%s) cannot talk to Controllers via different security protocols" %
(self.controller_security_protocol, self.controller_sasl_mechanism,
self.intercontroller_security_protocol, self.intercontroller_sasl_mechanism))
if self.quorum_info.using_zk or self.quorum_info.has_brokers:
self.open_port(self.security_protocol)
self.interbroker_listener.open = True
# we have to wait to decide whether to open the controller port(s)
# because it could be dependent on the particular node in the
# co-located case where the number of controllers could be less
# than the number of nodes in the service
self.start_minikdc_if_necessary(add_principals)
self._ensure_zk_chroot()
if self.quorum_info.using_zk:
self._ensure_zk_chroot()
if self.remote_controller_quorum:
self.remote_controller_quorum.start()
Service.start(self)
self.logger.info("Waiting for brokers to register at ZK")
if self.quorum_info.using_zk:
self.logger.info("Waiting for brokers to register at ZK")
retries = 30
expected_broker_ids = set(self.nodes)
wait_until(lambda: {node for node in self.nodes if self.is_registered(node)} == expected_broker_ids, 30, 1)
retries = 30
expected_broker_ids = set(self.nodes)
wait_until(lambda: {node for node in self.nodes if self.is_registered(node)} == expected_broker_ids, 30, 1)
if retries == 0:
raise RuntimeError("Kafka servers didn't register at ZK within 30 seconds")
if retries == 0:
raise RuntimeError("Kafka servers didn't register at ZK within 30 seconds")
# Create topics if necessary
if self.topics is not None:
@ -300,16 +550,25 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
advertised_listeners = []
protocol_map = []
controller_listener_names = self.controller_listener_name_list()
for port in self.port_mappings.values():
if port.open:
listeners.append(port.listener())
advertised_listeners.append(port.advertised_listener(node))
if not port.name in controller_listener_names:
advertised_listeners.append(port.advertised_listener(node))
protocol_map.append(port.listener_security_protocol())
controller_sec_protocol = self.remote_controller_quorum.controller_security_protocol if self.remote_controller_quorum \
else self.controller_security_protocol if self.quorum_info.has_brokers_and_controllers and not quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \
else None
if controller_sec_protocol:
protocol_map.append("%s:%s" % (self.controller_listener_name(controller_sec_protocol), controller_sec_protocol))
self.listeners = ','.join(listeners)
self.advertised_listeners = ','.join(advertised_listeners)
self.listener_security_protocol_map = ','.join(protocol_map)
self.interbroker_bootstrap_servers = self.__bootstrap_servers(self.interbroker_listener, True)
if self.quorum_info.using_zk or self.quorum_info.has_brokers:
self.interbroker_bootstrap_servers = self.__bootstrap_servers(self.interbroker_listener, True)
def prop_file(self, node):
self.set_protocol_and_port(node)
@ -324,13 +583,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
#load specific test override configs
override_configs = KafkaConfig(**node.config)
override_configs[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
override_configs[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting()
if self.zk_client_secure:
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'true'
override_configs[config_property.ZOOKEEPER_CLIENT_CNXN_SOCKET] = 'org.apache.zookeeper.ClientCnxnSocketNetty'
else:
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'
if self.quorum_info.using_zk or self.quorum_info.has_brokers:
override_configs[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
if self.quorum_info.using_zk:
override_configs[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting()
if self.zk_client_secure:
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'true'
override_configs[config_property.ZOOKEEPER_CLIENT_CNXN_SOCKET] = 'org.apache.zookeeper.ClientCnxnSocketNetty'
else:
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'
for prop in self.server_prop_overides:
override_configs[prop[0]] = prop[1]
@ -370,11 +631,40 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
KafkaService.STDOUT_STDERR_CAPTURE)
return cmd
def controller_listener_name_list(self):
if self.quorum_info.using_zk:
return []
broker_to_controller_listener_name = self.controller_listener_name(self.controller_quorum.controller_security_protocol)
return [broker_to_controller_listener_name] if (self.controller_quorum.intercontroller_security_protocol == self.controller_quorum.controller_security_protocol) \
else [broker_to_controller_listener_name, self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)]
def start_node(self, node, timeout_sec=60):
node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
self.node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
if self.quorum_info.has_controllers:
for controller_listener in self.controller_listener_name_list():
if self.node_quorum_info.has_controller_role:
self.open_port(controller_listener)
else: # co-located case where node doesn't have a controller
self.close_port(controller_listener)
self.security_config.setup_node(node)
self.maybe_setup_broker_scram_credentials(node)
if self.quorum_info.using_zk or self.quorum_info.has_brokers: # TODO: SCRAM currently unsupported for controller quorum
self.maybe_setup_broker_scram_credentials(node)
if self.quorum_info.using_raft:
# define controller.quorum.voters text
security_protocol_to_use = self.controller_quorum.controller_security_protocol
first_node_id = 1 if self.quorum_info.has_brokers_and_controllers else config_property.FIRST_CONTROLLER_ID
self.controller_quorum_voters = ','.join(["%s@%s:%s" %
(self.controller_quorum.idx(node) + first_node_id - 1,
node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
# define controller.listener.names
self.controller_listener_names = ','.join(self.controller_listener_name_list())
prop_file = self.prop_file(node)
self.logger.info("kafka.properties:")
@ -382,6 +672,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
node.account.create_file(KafkaService.CONFIG_FILE, prop_file)
node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))
if self.quorum_info.using_raft:
# format log directories if necessary
kafka_storage_script = self.path.script("kafka-storage.sh", node)
cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % (kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
self.logger.info("Running log directory format command...\n%s" % cmd)
node.account.ssh(cmd)
cmd = self.start_cmd(node)
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
@ -390,12 +687,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
monitor.wait_until("Kafka\s*Server.*started", timeout_sec=timeout_sec, backoff_sec=.25,
err_msg="Kafka server didn't finish startup in %d seconds" % timeout_sec)
# Credentials for inter-broker communication are created before starting Kafka.
# Client credentials are created after starting Kafka so that both loading of
# existing credentials from ZK and dynamic update of credentials in Kafka are tested.
# We use the admin client and connect as the broker user when creating the client (non-broker) credentials
# if Kafka supports KIP-554, otherwise we use ZooKeeper.
self.maybe_setup_client_scram_credentials(node)
if self.quorum_info.using_zk or self.quorum_info.has_brokers: # TODO: SCRAM currently unsupported for controller quorum
# Credentials for inter-broker communication are created before starting Kafka.
# Client credentials are created after starting Kafka so that both loading of
# existing credentials from ZK and dynamic update of credentials in Kafka are tested.
# We use the admin client and connect as the broker user when creating the client (non-broker) credentials
# if Kafka supports KIP-554, otherwise we use ZooKeeper.
self.maybe_setup_client_scram_credentials(node)
self.start_jmx_tool(self.idx(node), node)
if len(self.pids(node)) == 0:
@ -448,6 +746,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False)
def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None):
if self.quorum_info.using_raft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-topics against a broker, not a Raft controller")
if force_use_zk_connection:
bootstrap_server_or_zookeeper = "--zookeeper %s" % (self.zk_connect_setting())
skip_optional_security_settings = True
@ -485,6 +785,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
bootstrap_server_or_zookeeper, optional_command_config_suffix)
def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None):
if self.quorum_info.using_raft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-configs against a broker, not a Raft controller")
if force_use_zk_connection:
# kafka-configs supports a TLS config file, so include it if there is one
bootstrap_server_or_zookeeper = "--zookeeper %s %s" % (self.zk_connect_setting(), self.zk.zkTlsConfigFileOption())
@ -719,6 +1021,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
node.account.ssh(cmd)
def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None):
if self.quorum_info.using_raft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-acls against a broker, not a Raft controller")
force_use_zk_connection = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server
if force_use_zk_connection:
bootstrap_server_or_authorizer_zk_props = "--authorizer-properties zookeeper.connect=%s" % (self.zk_connect_setting())
@ -913,6 +1217,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
return missing
def restart_cluster(self, clean_shutdown=True, timeout_sec=60, after_each_broker_restart=None, *args):
# We do not restart the remote controller quorum if it exists.
# This is not widely used -- it typically appears in rolling upgrade tests --
# so we will let tests explicitly decide if/when to restart any remote controller quorum.
for node in self.nodes:
self.restart_node(node, clean_shutdown=clean_shutdown, timeout_sec=timeout_sec)
if after_each_broker_restart is not None:
@ -1021,6 +1328,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def cluster_id(self):
""" Get the current cluster id
"""
if self.quorum_info.using_raft:
return config_property.CLUSTER_ID
self.logger.debug("Querying ZooKeeper to retrieve cluster id")
cluster = self.zk.query("/cluster/id", chroot=self.zk_chroot)
@ -1031,6 +1341,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
raise
def topic_id(self, topic):
if self.quorum_info.using_raft:
raise Exception("Not yet implemented: Cannot obtain topic ID information when using Raft instead of ZooKeeper")
self.logger.debug(
"Querying zookeeper to find assigned topic ID for topic %s." % topic)
zk_path = "/brokers/topics/%s" % topic
@ -1107,6 +1419,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
return output
def zk_connect_setting(self):
if self.quorum_info.using_raft:
raise Exception("No zookeeper connect string available when using Raft instead of ZooKeeper")
return self.zk.connect_setting(self.zk_chroot, self.zk_client_secure)
def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
@ -1130,6 +1444,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def controller(self):
""" Get the controller node
"""
if self.quorum_info.using_raft:
raise Exception("Cannot obtain Controller node when using Raft instead of ZooKeeper")
self.logger.debug("Querying zookeeper to find controller broker")
controller_info = self.zk.query("/controller", chroot=self.zk_chroot)
@ -1147,6 +1463,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
"""
Check whether a broker is registered in Zookeeper
"""
if self.quorum_info.using_raft:
raise Exception("Cannot obtain broker registration information when using Raft instead of ZooKeeper")
self.logger.debug("Querying zookeeper to see if broker %s is registered", str(node))
broker_info = self.zk.query("/brokers/ids/%s" % self.idx(node), chroot=self.zk_chroot)
self.logger.debug("Broker info: %s", broker_info)

View File

@ -0,0 +1,144 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the types of metadata quorums we support
zk = 'ZK' # ZooKeeper, used before/during the KIP-500 bridge release(s)
colocated_raft = 'COLOCATED_RAFT' # co-located KIP-500 Controllers, used during/after the KIP-500 bridge release(s)
remote_raft = 'REMOTE_RAFT' # separate KIP-500 Controllers, used during/after the KIP-500 bridge release(s)
# How we will parameterize tests that exercise all quorum styles
# [“ZK”, “REMOTE_RAFT”, "COLOCATED_RAFT"] during the KIP-500 bridge release(s)
# [“REMOTE_RAFT”, "COLOCATED_RAFT”] after the KIP-500 bridge release(s)
all = [zk, remote_raft, colocated_raft]
# How we will parameterize tests that exercise all Raft quorum styles
all_raft = [remote_raft, colocated_raft]
# How we will parameterize tests that are unrelated to upgrades:
# [“ZK”] before the KIP-500 bridge release(s)
# [“ZK”, “REMOTE_RAFT”] during the KIP-500 bridge release(s) and in preview releases
# [“REMOTE_RAFT”] after the KIP-500 bridge release(s)
all_non_upgrade = [zk, remote_raft]
def for_test(test_context):
# A test uses ZooKeeper if it doesn't specify a metadata quorum or if it explicitly specifies ZooKeeper
default_quorum_type = zk
arg_name = 'metadata_quorum'
retval = default_quorum_type if not test_context.injected_args else test_context.injected_args.get(arg_name, default_quorum_type)
if retval not in all:
raise Exception("Unknown %s value provided for the test: %s" % (arg_name, retval))
return retval
class ServiceQuorumInfo:
"""
Exposes quorum-related information for a KafkaService
Kafka can use either ZooKeeper or a Raft Controller quorum for its
metadata. Raft Controllers can either be co-located with Kafka in
the same JVM or remote in separate JVMs. The choice is made via
the 'metadata_quorum' parameter defined for the system test: if it
is not explicitly defined, or if it is set to 'ZK', then ZooKeeper
is used. If it is explicitly set to 'COLOCATED_RAFT' then Raft
controllers will be co-located with the brokers; the value
`REMOTE_RAFT` indicates remote controllers.
Attributes
----------
kafka : KafkaService
The service for which this instance exposes quorum-related
information
quorum_type : str
COLOCATED_RAFT, REMOTE_RAFT, or ZK
using_zk : bool
True iff quorum_type==ZK
using_raft : bool
False iff quorum_type==ZK
has_brokers : bool
Whether there is at least one node with process.roles
containing 'broker'. True iff using_raft and the Kafka
service doesn't itself have a remote Kafka service (meaning
it is not a remote controller quorum).
has_controllers : bool
Whether there is at least one node with process.roles
containing 'controller'. True iff quorum_type ==
COLOCATED_RAFT or the Kafka service itself has a remote Kafka
service (meaning it is a remote controller quorum).
has_brokers_and_controllers :
True iff quorum_type==COLOCATED_RAFT
"""
def __init__(self, kafka, context):
"""
:param kafka : KafkaService
The service for which this instance exposes quorum-related
information
:param context : TestContext
The test context within which the this instance and the
given Kafka service is being instantiated
"""
quorum_type = for_test(context)
if quorum_type != zk and kafka.zk:
raise Exception("Cannot use ZooKeeper while specifying a Raft metadata quorum (should not happen)")
if kafka.remote_kafka and quorum_type != remote_raft:
raise Exception("Cannot specify a remote Kafka service unless using a remote Raft metadata quorum (should not happen)")
self.kafka = kafka
self.quorum_type = quorum_type
self.using_zk = quorum_type == zk
self.using_raft = not self.using_zk
self.has_brokers = self.using_raft and not kafka.remote_kafka
self.has_controllers = quorum_type == colocated_raft or kafka.remote_kafka
self.has_brokers_and_controllers = quorum_type == colocated_raft
class NodeQuorumInfo:
"""
Exposes quorum-related information for a node in a KafkaService
Attributes
----------
service_quorum_info : ServiceQuorumInfo
The quorum information about the service to which the node
belongs
has_broker_role : bool
True iff using_raft and the Kafka service doesn't itself have
a remote Kafka service (meaning it is not a remote controller)
has_controller_role : bool
True iff quorum_type==COLOCATED_RAFT and the node is one of
the first N in the cluster where N is the number of nodes
that have a controller role; or the Kafka service itself has a
remote Kafka service (meaning it is a remote controller
quorum).
has_combined_broker_and_controller_roles :
True iff has_broker_role==True and has_controller_role==true
"""
def __init__(self, service_quorum_info, node):
"""
:param service_quorum_info : ServiceQuorumInfo
The quorum information about the service to which the node
belongs
:param node : Node
The particular node for which this information applies.
In the co-located case, whether or not a node's broker's
process.roles contains 'controller' may vary based on the
particular node if the number of controller nodes is less
than the number of nodes in the service.
"""
self.service_quorum_info = service_quorum_info
self.has_broker_role = self.service_quorum_info.has_brokers
idx = self.service_quorum_info.kafka.nodes.index(node)
self.has_controller_role = self.service_quorum_info.kafka.num_nodes_controller_role > idx
self.has_combined_broker_and_controller_roles = self.has_broker_role and self.has_controller_role

View File

@ -14,18 +14,36 @@
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
advertised.host.name={{ node.account.hostname }}
{% if quorum_info.using_raft %}
# The role(s) of this server. Setting this puts us in Raft metadata quorm mode
{% if node_quorum_info.has_combined_broker_and_controller_roles %}
process.roles=broker,controller
{% elif node_quorum_info.has_controller_role %}
process.roles=controller
{% else %}
process.roles=broker
{% endif %}
# The connect string for the controller quorum
controller.quorum.voters={{ controller_quorum_voters }}
controller.listener.names={{ controller_listener_names }}
{% endif %}
listeners={{ listeners }}
advertised.listeners={{ advertised_listeners }}
listener.security.protocol.map={{ listener_security_protocol_map }}
{% if quorum_info.using_zk or quorum_info.has_brokers %}
advertised.host.name={{ node.account.hostname }}
advertised.listeners={{ advertised_listeners }}
{% if node.version.supports_named_listeners() %}
inter.broker.listener.name={{ interbroker_listener.name }}
{% else %}
security.inter.broker.protocol={{ interbroker_listener.security_protocol }}
{% endif %}
{% endif %}
{% for k, v in listener_security_config.client_listener_overrides.items() %}
{% if listener_security_config.requires_sasl_mechanism_prefix(k) %}
@ -35,6 +53,7 @@ listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }}
{% endif %}
{% endfor %}
{% if quorum_info.using_zk or quorum_info.has_brokers %}
{% if interbroker_listener.name != security_protocol %}
{% for k, v in listener_security_config.interbroker_listener_overrides.items() %}
{% if listener_security_config.requires_sasl_mechanism_prefix(k) %}
@ -44,6 +63,8 @@ listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }}
{% endif %}
{% endfor %}
{% endif %}
{% endif %}
{% if security_config.tls_version is not none %}
ssl.enabled.protocols={{ security_config.tls_version }}
ssl.protocol={{ security_config.tls_version }}
@ -56,6 +77,8 @@ ssl.truststore.location=/mnt/security/test.truststore.jks
ssl.truststore.password=test-ts-passwd
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=HTTPS
{% if quorum_info.using_zk %}
# Zookeeper TLS settings
#
# Note that zookeeper.ssl.client.enable will be set to true or false elsewhere, as appropriate.
@ -67,8 +90,11 @@ zookeeper.ssl.keystore.password=test-ks-passwd
{% endif %}
zookeeper.ssl.truststore.location=/mnt/security/test.truststore.jks
zookeeper.ssl.truststore.password=test-ts-passwd
{% endif %}
#
{% if quorum_info.using_zk or quorum_info.has_brokers %}
sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }}
{% endif %}
sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
sasl.kerberos.service.name=kafka
{% if authorizer_class_name is not none %}
@ -76,10 +102,12 @@ ssl.client.auth=required
authorizer.class.name={{ authorizer_class_name }}
{% endif %}
{% if quorum_info.using_zk %}
zookeeper.set.acl={{"true" if zk_set_acl else "false"}}
zookeeper.connection.timeout.ms={{ zk_connect_timeout }}
zookeeper.session.timeout.ms={{ zk_session_timeout }}
{% endif %}
{% if replica_lag is defined %}
replica.lag.time.max.ms={{replica_lag}}