mirror of https://github.com/apache/kafka.git
KAFKA-17624 Remove the E2E uses of accessing ACLs from zk (#17338)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
ccdf176d05
commit
2733268409
|
@ -19,45 +19,6 @@ class ACLs:
|
|||
def __init__(self, context):
|
||||
self.context = context
|
||||
|
||||
def set_acls(self, protocol, kafka, topic, group, force_use_zk_connection=False, additional_cluster_operations_to_grant = []):
|
||||
"""
|
||||
Creates ACls for the Kafka Broker principal that brokers use in tests
|
||||
|
||||
:param protocol: the security protocol to use (e.g. PLAINTEXT, SASL_PLAINTEXT, etc.)
|
||||
:param kafka: Kafka cluster upon which ClusterAction ACL is created
|
||||
:param topic: topic for which produce and consume ACLs are created
|
||||
:param group: consumer group for which consume ACL is created
|
||||
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available.
|
||||
This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled
|
||||
:param additional_cluster_operations_to_grant may be set to ['Alter', 'Create'] if the cluster is secured since these are required
|
||||
to create SCRAM credentials and topics, respectively
|
||||
"""
|
||||
# Set server ACLs
|
||||
kafka_principal = "User:CN=systemtest" if protocol == "SSL" else "User:kafka"
|
||||
self.add_cluster_acl(kafka, kafka_principal, force_use_zk_connection=force_use_zk_connection, additional_cluster_operations_to_grant = additional_cluster_operations_to_grant)
|
||||
self.add_read_acl(kafka, kafka_principal, "*", force_use_zk_connection=force_use_zk_connection)
|
||||
|
||||
# Set client ACLs
|
||||
client_principal = "User:CN=systemtest" if protocol == "SSL" else "User:client"
|
||||
self.add_produce_acl(kafka, client_principal, topic, force_use_zk_connection=force_use_zk_connection)
|
||||
self.add_consume_acl(kafka, client_principal, topic, group, force_use_zk_connection=force_use_zk_connection)
|
||||
|
||||
def _add_acl_on_topic(self, kafka, principal, topic, operation_flag, node, force_use_zk_connection):
|
||||
"""
|
||||
:param principal: principal for which ACL is created
|
||||
:param topic: topic for which ACL is created
|
||||
:param operation_flag: type of ACL created (e.g. --producer, --consumer, --operation=Read)
|
||||
:param node: Node to use when determining connection settings
|
||||
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available
|
||||
"""
|
||||
cmd = "%(cmd_prefix)s --add --topic=%(topic)s %(operation_flag)s --allow-principal=%(principal)s" % {
|
||||
'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, force_use_zk_connection),
|
||||
'topic': topic,
|
||||
'operation_flag': operation_flag,
|
||||
'principal': principal
|
||||
}
|
||||
kafka.run_cli_tool(node, cmd)
|
||||
|
||||
def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, additional_cluster_operations_to_grant = [], security_protocol=None):
|
||||
"""
|
||||
:param kafka: Kafka cluster upon which ClusterAction ACL is created
|
||||
|
@ -104,50 +65,3 @@ class ACLs:
|
|||
}
|
||||
kafka.logger.info(cmd)
|
||||
kafka.run_cli_tool(node, cmd)
|
||||
|
||||
def add_read_acl(self, kafka, principal, topic, force_use_zk_connection=False):
|
||||
"""
|
||||
:param kafka: Kafka cluster upon which Read ACL is created
|
||||
:param principal: principal for which Read ACL is created
|
||||
:param topic: topic for which Read ACL is created
|
||||
:param node: Node to use when determining connection settings
|
||||
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available.
|
||||
This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled
|
||||
"""
|
||||
node = kafka.nodes[0]
|
||||
|
||||
self._add_acl_on_topic(kafka, principal, topic, "--operation=Read", node, force_use_zk_connection)
|
||||
|
||||
def add_produce_acl(self, kafka, principal, topic, force_use_zk_connection=False):
|
||||
"""
|
||||
:param kafka: Kafka cluster upon which Producer ACL is created
|
||||
:param principal: principal for which Producer ACL is created
|
||||
:param topic: topic for which Producer ACL is created
|
||||
:param node: Node to use when determining connection settings
|
||||
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available.
|
||||
This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled
|
||||
"""
|
||||
node = kafka.nodes[0]
|
||||
|
||||
self._add_acl_on_topic(kafka, principal, topic, "--producer", node, force_use_zk_connection)
|
||||
|
||||
def add_consume_acl(self, kafka, principal, topic, group, force_use_zk_connection=False):
|
||||
"""
|
||||
:param kafka: Kafka cluster upon which Consumer ACL is created
|
||||
:param principal: principal for which Consumer ACL is created
|
||||
:param topic: topic for which Consumer ACL is created
|
||||
:param group: consumewr group for which Consumer ACL is created
|
||||
:param node: Node to use when determining connection settings
|
||||
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available.
|
||||
This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled
|
||||
"""
|
||||
node = kafka.nodes[0]
|
||||
|
||||
cmd = "%(cmd_prefix)s --add --topic=%(topic)s --group=%(group)s --consumer --allow-principal=%(principal)s" % {
|
||||
'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, force_use_zk_connection),
|
||||
'topic': topic,
|
||||
'group': group,
|
||||
'principal': principal
|
||||
}
|
||||
kafka.run_cli_tool(node, cmd)
|
||||
|
||||
|
|
|
@ -283,20 +283,6 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
|
|||
output = self.nodes[0].account.ssh_output(cmd)
|
||||
self.logger.debug(output)
|
||||
|
||||
def list_acls(self, topic):
|
||||
"""
|
||||
List ACLs for the given topic using the AclCommand CLI
|
||||
"""
|
||||
|
||||
kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
|
||||
cmd = "%s kafka.admin.AclCommand --authorizer-properties zookeeper.connect=%s %s --list --topic %s" % \
|
||||
(kafka_run_class, self.connect_setting(force_tls=self.zk_client_secure_port),
|
||||
self.zkTlsConfigFileOption(),
|
||||
topic)
|
||||
self.logger.debug(cmd)
|
||||
output = self.nodes[0].account.ssh_output(cmd)
|
||||
self.logger.debug(output)
|
||||
|
||||
def java_class_name(self):
|
||||
""" The class name of the Zookeeper quorum peers. """
|
||||
return "org.apache.zookeeper.server.quorum.QuorumPeerMain"
|
||||
|
|
|
@ -61,22 +61,6 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
|
|||
self.kafka.start_minikdc_if_necessary()
|
||||
self.kafka.restart_cluster(after_each_broker_restart = lambda: time.sleep(10))
|
||||
|
||||
def roll_in_secured_settings(self, client_protocol, broker_protocol):
|
||||
# Roll cluster to include inter broker security protocol.
|
||||
self.kafka.setup_interbroker_listener(broker_protocol)
|
||||
self.bounce()
|
||||
|
||||
# Roll cluster to disable PLAINTEXT port
|
||||
self.kafka.close_port(SecurityConfig.PLAINTEXT)
|
||||
self.set_authorizer_and_bounce(client_protocol, broker_protocol)
|
||||
|
||||
def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
|
||||
self.kafka.authorizer_class_name = KafkaService.ZK_ACL_AUTHORIZER
|
||||
# Force use of direct ZooKeeper access due to SecurityDisabledException: No Authorizer is configured on the broker.
|
||||
self.acls.set_acls(client_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
|
||||
self.acls.set_acls(broker_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
|
||||
self.bounce() # enables the authorizer
|
||||
|
||||
def open_secured_port(self, client_protocol):
|
||||
self.kafka.security_protocol = client_protocol
|
||||
self.kafka.open_port(client_protocol)
|
||||
|
@ -88,17 +72,6 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
|
|||
self.kafka.start_minikdc_if_necessary()
|
||||
self.bounce()
|
||||
|
||||
def roll_in_sasl_mechanism(self, security_protocol, new_sasl_mechanism):
|
||||
# Roll cluster to update inter-broker SASL mechanism.
|
||||
# We need the inter-broker SASL mechanism to still be enabled through this roll.
|
||||
self.kafka.client_sasl_mechanism = "%s,%s" % (self.kafka.interbroker_sasl_mechanism, new_sasl_mechanism)
|
||||
self.kafka.interbroker_sasl_mechanism = new_sasl_mechanism
|
||||
self.bounce()
|
||||
|
||||
# Bounce again with ACLs for new mechanism.
|
||||
self.kafka.client_sasl_mechanism = new_sasl_mechanism # Removes old SASL mechanism completely
|
||||
self.set_authorizer_and_bounce(security_protocol, security_protocol)
|
||||
|
||||
def add_separate_broker_listener(self, broker_security_protocol, broker_sasl_mechanism):
|
||||
# Enable the new internal listener on all brokers first
|
||||
self.kafka.open_port(self.kafka.INTERBROKER_LISTENER_NAME)
|
||||
|
@ -149,35 +122,6 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
|
|||
self.create_producer_and_consumer()
|
||||
self.run_produce_consume_validate(lambda: time.sleep(1))
|
||||
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(client_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT],
|
||||
broker_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT])
|
||||
def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
|
||||
"""
|
||||
Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one).
|
||||
A third secure port is also open if inter-broker and client protocols are different.
|
||||
Start a Producer and Consumer via the SECURED client port
|
||||
Incrementally upgrade to add inter-broker be the secure broker protocol
|
||||
Incrementally upgrade again to add ACLs as well as disabling the PLAINTEXT port
|
||||
Ensure the producer and consumer ran throughout
|
||||
"""
|
||||
#Given we have a broker that has both secure and PLAINTEXT ports open
|
||||
self.kafka.security_protocol = client_protocol
|
||||
self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, use_separate_listener=False)
|
||||
self.kafka.open_port(broker_protocol)
|
||||
# Set any SASL mechanism explicitly when it isn't already set by the client protocol
|
||||
is_broker_protocol_sasl = broker_protocol in [SecurityConfig.SASL_SSL, SecurityConfig.SASL_PLAINTEXT]
|
||||
is_client_protocol_sasl = client_protocol in [SecurityConfig.SASL_SSL, SecurityConfig.SASL_PLAINTEXT]
|
||||
if is_broker_protocol_sasl and not is_client_protocol_sasl:
|
||||
self.kafka.port_mappings[broker_protocol].sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
|
||||
self.kafka.start()
|
||||
|
||||
#Create Secured Producer and Consumer
|
||||
self.create_producer_and_consumer()
|
||||
|
||||
#Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
|
||||
self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
|
||||
|
||||
@cluster(num_nodes=9)
|
||||
@matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
|
||||
def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
|
||||
|
@ -202,29 +146,6 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
|
|||
self.create_producer_and_consumer()
|
||||
self.run_produce_consume_validate(lambda: time.sleep(1))
|
||||
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(new_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
|
||||
def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
|
||||
"""
|
||||
Start with a SASL cluster with GSSAPI for inter-broker and a second mechanism for clients (i.e. result of phase one).
|
||||
Start Producer and Consumer using the second mechanism
|
||||
Incrementally upgrade to set inter-broker to the second mechanism and disable GSSAPI
|
||||
Incrementally upgrade again to add ACLs
|
||||
Ensure the producer and consumer run throughout
|
||||
"""
|
||||
#Start with a broker that has GSSAPI for inter-broker and a second mechanism for clients
|
||||
self.kafka.security_protocol = SecurityConfig.SASL_SSL
|
||||
self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=False)
|
||||
self.kafka.client_sasl_mechanism = new_sasl_mechanism
|
||||
self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
|
||||
self.kafka.start()
|
||||
|
||||
#Create Producer and Consumer using second mechanism
|
||||
self.create_producer_and_consumer()
|
||||
|
||||
#Roll in the second SASL mechanism for inter-broker, disabling first mechanism. Ensure we can produce and consume throughout
|
||||
self.run_produce_consume_validate(self.roll_in_sasl_mechanism, self.kafka.security_protocol, new_sasl_mechanism)
|
||||
|
||||
@cluster(num_nodes=9)
|
||||
def test_enable_separate_interbroker_listener(self):
|
||||
"""
|
||||
|
|
|
@ -1,205 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.mark import parametrize
|
||||
from ducktape.mark.resource import cluster
|
||||
from ducktape.utils.util import wait_until
|
||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.kafka import config_property
|
||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.utils import is_int
|
||||
from kafkatest.utils.remote_account import java_version
|
||||
from kafkatest.version import LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
|
||||
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \
|
||||
LATEST_3_7, LATEST_3_8, V_2_8_0, DEV_BRANCH, KafkaVersion
|
||||
from kafkatest.services.kafka.util import new_jdk_not_supported
|
||||
|
||||
class TestUpgrade(ProduceConsumeValidateTest):
|
||||
|
||||
def __init__(self, test_context):
|
||||
super(TestUpgrade, self).__init__(test_context=test_context)
|
||||
|
||||
def setUp(self):
|
||||
self.topic = "test_topic"
|
||||
self.partitions = 3
|
||||
self.replication_factor = 3
|
||||
|
||||
# Producer and consumer
|
||||
self.producer_throughput = 1000
|
||||
self.num_producers = 1
|
||||
self.num_consumers = 1
|
||||
|
||||
def wait_until_rejoin(self):
|
||||
for partition in range(0, self.partitions):
|
||||
wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.replication_factor, timeout_sec=60,
|
||||
backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
|
||||
|
||||
def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
|
||||
self.logger.info("Upgrade ZooKeeper from %s to %s" % (str(self.zk.nodes[0].version), str(DEV_BRANCH)))
|
||||
self.zk.set_version(DEV_BRANCH)
|
||||
self.zk.restart_cluster()
|
||||
# Confirm we have a successful ZooKeeper upgrade by List ACLs for the topic.
|
||||
# Not trying to detect a problem here leads to failure in the ensuing Kafka roll, which would be a less
|
||||
# intuitive failure than seeing a problem here, so detect ZooKeeper upgrade problems before involving Kafka.
|
||||
self.zk.list_acls(self.topic)
|
||||
# Do some stuff that exercises the use of ZooKeeper before we upgrade to the latest ZooKeeper client version
|
||||
self.logger.info("First pass bounce - rolling Kafka with old ZooKeeper client")
|
||||
for node in self.kafka.nodes:
|
||||
self.kafka.restart_node(node)
|
||||
topic_cfg = {
|
||||
"topic": "another_topic",
|
||||
"partitions": self.partitions,
|
||||
"replication-factor": self.replication_factor,
|
||||
"configs": {"min.insync.replicas": 2}
|
||||
}
|
||||
self.kafka.create_topic(topic_cfg)
|
||||
|
||||
self.logger.info("Second pass bounce - rolling upgrade")
|
||||
for node in self.kafka.nodes:
|
||||
self.kafka.stop_node(node)
|
||||
node.version = DEV_BRANCH
|
||||
node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version
|
||||
node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version
|
||||
self.kafka.start_node(node)
|
||||
self.wait_until_rejoin()
|
||||
|
||||
self.logger.info("Third pass bounce - remove inter.broker.protocol.version config")
|
||||
for node in self.kafka.nodes:
|
||||
self.kafka.stop_node(node)
|
||||
del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
|
||||
if to_message_format_version is None:
|
||||
del node.config[config_property.MESSAGE_FORMAT_VERSION]
|
||||
else:
|
||||
node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
|
||||
self.kafka.start_node(node)
|
||||
self.wait_until_rejoin()
|
||||
|
||||
@cluster(num_nodes=6)
|
||||
@parametrize(from_kafka_version=str(LATEST_3_8), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_8), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_8), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_7), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_7), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_7), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_6), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_6), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_6), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_5), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_5), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_5), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_4), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_4), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_4), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_2), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_2), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_2), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_1), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_1), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_1), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_0), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_0), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_3_0), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_8), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_8), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_8), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_7), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_7), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_7), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_6), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_6), to_message_format_version=None, compression_types=["lz4"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_6), to_message_format_version=None, compression_types=["snappy"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_5), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_5), to_message_format_version=None, compression_types=["zstd"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_4), to_message_format_version=None, compression_types=["none"])
|
||||
@parametrize(from_kafka_version=str(LATEST_2_4), to_message_format_version=None, compression_types=["zstd"])
|
||||
def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,
|
||||
security_protocol="PLAINTEXT"):
|
||||
"""Test upgrade of Kafka broker cluster from various versions to the current version
|
||||
|
||||
from_kafka_version is a Kafka version to upgrade from
|
||||
|
||||
If to_message_format_version is None, it means that we will upgrade to default (latest)
|
||||
message format version. It is possible to upgrade to 0.10 brokers but still use message
|
||||
format version 0.9
|
||||
|
||||
- Start 3 node broker cluster on version 'from_kafka_version'
|
||||
- Start producer and consumer in the background
|
||||
- Perform two-phase rolling upgrade
|
||||
- First phase: upgrade brokers to 0.10 with inter.broker.protocol.version set to
|
||||
from_kafka_version and log.message.format.version set to from_kafka_version
|
||||
- Second phase: remove inter.broker.protocol.version config with rolling bounce; if
|
||||
to_message_format_version is set to 0.9, set log.message.format.version to
|
||||
to_message_format_version, otherwise remove log.message.format.version config
|
||||
- Finally, validate that every message acked by the producer was consumed by the consumer
|
||||
"""
|
||||
self.zk = ZookeeperService(self.test_context, num_nodes=1, version=KafkaVersion(from_kafka_version))
|
||||
fromKafkaVersion = KafkaVersion(from_kafka_version)
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
|
||||
version=fromKafkaVersion,
|
||||
topics={self.topic: {"partitions": self.partitions,
|
||||
"replication-factor": self.replication_factor,
|
||||
'configs': {"min.insync.replicas": 2}}})
|
||||
self.kafka.security_protocol = security_protocol
|
||||
self.kafka.interbroker_security_protocol = security_protocol
|
||||
|
||||
jdk_version = java_version(self.kafka.nodes[0])
|
||||
|
||||
if jdk_version > 9 and from_kafka_version in new_jdk_not_supported:
|
||||
self.logger.info("Test ignored! Kafka " + from_kafka_version + " not support jdk " + str(jdk_version))
|
||||
return
|
||||
|
||||
self.zk.start()
|
||||
self.kafka.start()
|
||||
|
||||
old_id = self.kafka.topic_id(self.topic)
|
||||
|
||||
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
|
||||
self.topic, throughput=self.producer_throughput,
|
||||
message_validator=is_int,
|
||||
compression_types=compression_types,
|
||||
version=KafkaVersion(from_kafka_version))
|
||||
|
||||
self.may_truncate_acked_records = False
|
||||
|
||||
new_consumer = fromKafkaVersion.consumer_supports_bootstrap_server()
|
||||
# TODO - reduce the timeout
|
||||
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
|
||||
self.topic, new_consumer=new_consumer, consumer_timeout_ms=30000,
|
||||
message_validator=is_int, version=KafkaVersion(from_kafka_version))
|
||||
|
||||
self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version,
|
||||
to_message_format_version))
|
||||
|
||||
cluster_id = self.kafka.cluster_id()
|
||||
assert cluster_id is not None
|
||||
assert len(cluster_id) == 22
|
||||
|
||||
assert self.kafka.all_nodes_support_topic_ids()
|
||||
new_id = self.kafka.topic_id(self.topic)
|
||||
if from_kafka_version >= V_2_8_0:
|
||||
assert old_id is not None
|
||||
assert new_id is not None
|
||||
assert old_id == new_id
|
||||
else:
|
||||
assert old_id is None
|
||||
assert new_id is not None
|
||||
|
||||
assert self.kafka.check_protocol_errors(self)
|
|
@ -1,112 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.mark import matrix
|
||||
from ducktape.mark.resource import cluster
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.services.security.kafka_acls import ACLs
|
||||
from kafkatest.utils import is_int
|
||||
|
||||
class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
|
||||
"""Tests a rolling upgrade for zookeeper.
|
||||
"""
|
||||
|
||||
def __init__(self, test_context):
|
||||
super(ZooKeeperSecurityUpgradeTest, self).__init__(test_context=test_context)
|
||||
|
||||
def setUp(self):
|
||||
self.topic = "test_topic"
|
||||
self.group = "group"
|
||||
self.producer_throughput = 100
|
||||
self.num_producers = 1
|
||||
self.num_consumers = 1
|
||||
self.acls = ACLs(self.test_context)
|
||||
|
||||
self.zk = ZookeeperService(self.test_context, num_nodes=3)
|
||||
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
|
||||
"partitions": 3,
|
||||
"replication-factor": 3,
|
||||
'configs': {"min.insync.replicas": 2}}})
|
||||
|
||||
def create_producer_and_consumer(self):
|
||||
self.producer = VerifiableProducer(
|
||||
self.test_context, self.num_producers, self.kafka, self.topic,
|
||||
throughput=self.producer_throughput)
|
||||
|
||||
self.consumer = ConsoleConsumer(
|
||||
self.test_context, self.num_consumers, self.kafka, self.topic,
|
||||
consumer_timeout_ms=60000, message_validator=is_int)
|
||||
|
||||
self.consumer.group_id = self.group
|
||||
|
||||
@property
|
||||
def no_sasl(self):
|
||||
return self.kafka.security_protocol == "PLAINTEXT" or self.kafka.security_protocol == "SSL"
|
||||
|
||||
@property
|
||||
def is_secure(self):
|
||||
return self.kafka.security_protocol == "SASL_PLAINTEXT" \
|
||||
or self.kafka.security_protocol == "SSL" \
|
||||
or self.kafka.security_protocol == "SASL_SSL"
|
||||
|
||||
def run_zk_migration(self):
|
||||
# change zk config (auth provider + jaas login)
|
||||
self.zk.zk_sasl = True
|
||||
if self.no_sasl:
|
||||
self.kafka.start_minikdc_if_necessary(self.zk.zk_principals)
|
||||
# restart zk
|
||||
self.zk.restart_cluster()
|
||||
|
||||
# restart broker with jaas login
|
||||
self.kafka.restart_cluster()
|
||||
|
||||
# run migration tool
|
||||
for node in self.zk.nodes:
|
||||
self.zk.zookeeper_migration(node, "secure")
|
||||
|
||||
# restart broker with zookeeper.set.acl=true and acls
|
||||
self.kafka.zk_set_acl = True
|
||||
self.kafka.restart_cluster()
|
||||
|
||||
@cluster(num_nodes=9)
|
||||
@matrix(security_protocol=["PLAINTEXT", "SSL", "SASL_SSL", "SASL_PLAINTEXT"])
|
||||
def test_zk_security_upgrade(self, security_protocol):
|
||||
self.zk.start()
|
||||
self.kafka.security_protocol = security_protocol
|
||||
self.kafka.interbroker_security_protocol = security_protocol
|
||||
|
||||
# set acls
|
||||
if self.is_secure:
|
||||
self.kafka.authorizer_class_name = KafkaService.ZK_ACL_AUTHORIZER
|
||||
# Force use of direct ZooKeeper access because Kafka is not yet started
|
||||
self.acls.set_acls(security_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True,
|
||||
additional_cluster_operations_to_grant=['Create'])
|
||||
|
||||
if self.no_sasl:
|
||||
self.kafka.start()
|
||||
else:
|
||||
self.kafka.start(self.zk.zk_principals)
|
||||
|
||||
#Create Producer and Consumer
|
||||
self.create_producer_and_consumer()
|
||||
|
||||
#Run upgrade
|
||||
self.run_produce_consume_validate(self.run_zk_migration)
|
|
@ -1,88 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.mark.resource import cluster
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.utils import is_int
|
||||
|
||||
class ZookeeperTlsEncryptOnlyTest(ProduceConsumeValidateTest):
|
||||
"""Tests TLS encryption-only (ssl.clientAuth=none) connectivity to zookeeper.
|
||||
"""
|
||||
|
||||
def __init__(self, test_context):
|
||||
super(ZookeeperTlsEncryptOnlyTest, self).__init__(test_context=test_context)
|
||||
|
||||
def setUp(self):
|
||||
self.topic = "test_topic"
|
||||
self.group = "group"
|
||||
self.producer_throughput = 100
|
||||
self.num_producers = 1
|
||||
self.num_consumers = 1
|
||||
|
||||
self.zk = ZookeeperService(self.test_context, num_nodes=3,
|
||||
zk_client_port = False, zk_client_secure_port = True, zk_tls_encrypt_only = True)
|
||||
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, zk_client_secure=True, topics={self.topic: {
|
||||
"partitions": 3,
|
||||
"replication-factor": 3,
|
||||
'configs': {"min.insync.replicas": 2}}})
|
||||
|
||||
def create_producer_and_consumer(self):
|
||||
self.producer = VerifiableProducer(
|
||||
self.test_context, self.num_producers, self.kafka, self.topic,
|
||||
throughput=self.producer_throughput)
|
||||
|
||||
self.consumer = ConsoleConsumer(
|
||||
self.test_context, self.num_consumers, self.kafka, self.topic,
|
||||
consumer_timeout_ms=60000, message_validator=is_int)
|
||||
|
||||
self.consumer.group_id = self.group
|
||||
|
||||
def perform_produce_consume_validation(self):
|
||||
self.create_producer_and_consumer()
|
||||
self.run_produce_consume_validate()
|
||||
self.producer.free()
|
||||
self.consumer.free()
|
||||
|
||||
@cluster(num_nodes=9)
|
||||
def test_zk_tls_encrypt_only(self):
|
||||
self.zk.start()
|
||||
self.kafka.security_protocol = self.kafka.interbroker_security_protocol = "PLAINTEXT"
|
||||
|
||||
self.kafka.start()
|
||||
|
||||
self.perform_produce_consume_validation()
|
||||
|
||||
# Make sure the ZooKeeper command line is able to talk to a TLS-enabled, encrypt-only ZooKeeper quorum
|
||||
# Test both create() and query(), each of which leverages the ZooKeeper command line
|
||||
# This tests the code in org.apache.zookeeper.ZooKeeperMainWithTlsSupportForKafka
|
||||
path="/foo"
|
||||
value="{\"bar\": 0}"
|
||||
self.zk.create(path, value=value)
|
||||
if self.zk.query(path) != value:
|
||||
raise Exception("Error creating and then querying a znode using the CLI with a TLS-enabled ZooKeeper quorum")
|
||||
|
||||
# Make sure the ConfigCommand CLI is able to talk to a TLS-enabled, encrypt-only ZooKeeper quorum
|
||||
# This is necessary for the bootstrap use case despite direct ZooKeeper connectivity being deprecated
|
||||
self.zk.describeUsers()
|
||||
|
||||
# Make sure the AclCommand CLI is able to talk to a TLS-enabled, encrypt-only ZooKeeper quorum
|
||||
# This is necessary for the bootstrap use case despite direct ZooKeeper connectivity being deprecated
|
||||
self.zk.list_acls(self.topic)
|
|
@ -1,153 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.mark.resource import cluster
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.utils import is_int
|
||||
|
||||
class ZookeeperTlsTest(ProduceConsumeValidateTest):
|
||||
"""Tests TLS connectivity to zookeeper.
|
||||
"""
|
||||
|
||||
def __init__(self, test_context):
|
||||
super(ZookeeperTlsTest, self).__init__(test_context=test_context)
|
||||
|
||||
def setUp(self):
|
||||
self.topic = "test_topic"
|
||||
self.group = "group"
|
||||
self.producer_throughput = 100
|
||||
self.num_producers = 1
|
||||
self.num_consumers = 1
|
||||
|
||||
self.zk = ZookeeperService(self.test_context, num_nodes=3)
|
||||
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
|
||||
"partitions": 3,
|
||||
"replication-factor": 3,
|
||||
'configs': {"min.insync.replicas": 2}}})
|
||||
|
||||
def create_producer_and_consumer(self):
|
||||
self.producer = VerifiableProducer(
|
||||
self.test_context, self.num_producers, self.kafka, self.topic,
|
||||
throughput=self.producer_throughput)
|
||||
|
||||
self.consumer = ConsoleConsumer(
|
||||
self.test_context, self.num_consumers, self.kafka, self.topic,
|
||||
consumer_timeout_ms=60000, message_validator=is_int)
|
||||
|
||||
self.consumer.group_id = self.group
|
||||
|
||||
def perform_produce_consume_validation(self):
|
||||
self.create_producer_and_consumer()
|
||||
self.run_produce_consume_validate()
|
||||
self.producer.free()
|
||||
self.consumer.free()
|
||||
|
||||
def enable_zk_tls(self):
|
||||
self.test_context.logger.debug("Enabling the TLS port in Zookeeper (we won't use it from Kafka yet)")
|
||||
# change zk config (enable TLS, but also keep non-TLS)
|
||||
self.zk.zk_client_secure_port = True
|
||||
self.zk.restart_cluster()
|
||||
# bounce a Kafka broker -- allows us to detect a broker restart failure as a simple sanity check
|
||||
self.kafka.stop_node(self.kafka.nodes[0])
|
||||
self.kafka.start_node(self.kafka.nodes[0])
|
||||
|
||||
def enable_kafka_zk_tls(self):
|
||||
self.test_context.logger.debug("Configuring Kafka to use the TLS port in Zookeeper")
|
||||
# change Kafka config (enable TLS to Zookeeper) and restart the Kafka cluster
|
||||
self.kafka.zk_client_secure = True
|
||||
self.kafka.restart_cluster()
|
||||
|
||||
def disable_zk_non_tls(self):
|
||||
self.test_context.logger.debug("Disabling the non-TLS port in Zookeeper (as a simple sanity check)")
|
||||
# change zk config (disable non-TLS, keep TLS) and restart the ZooKeeper cluster
|
||||
self.zk.zk_client_port = False
|
||||
self.zk.restart_cluster()
|
||||
# bounce a Kafka broker -- allows us to detect a broker restart failure as a simple sanity check
|
||||
self.kafka.stop_node(self.kafka.nodes[0])
|
||||
self.kafka.start_node(self.kafka.nodes[0])
|
||||
|
||||
@cluster(num_nodes=9)
|
||||
def test_zk_tls(self):
|
||||
self.zk.start()
|
||||
self.kafka.security_protocol = self.kafka.interbroker_security_protocol = "PLAINTEXT"
|
||||
|
||||
self.kafka.start()
|
||||
|
||||
# Enable TLS port in Zookeeper in addition to the regular non-TLS port
|
||||
# Bounces the ZooKeeper cluster (and a single broker as a sanity check)
|
||||
self.enable_zk_tls()
|
||||
|
||||
# Leverage ZooKeeper TLS port in Kafka
|
||||
# Bounces the Kafka cluster
|
||||
self.enable_kafka_zk_tls()
|
||||
self.perform_produce_consume_validation()
|
||||
|
||||
# Disable ZooKeeper non-TLS port to make sure we aren't using it
|
||||
# Bounces the ZooKeeper cluster (and a single broker as a sanity check)
|
||||
self.disable_zk_non_tls()
|
||||
|
||||
# Make sure the ZooKeeper command line is able to talk to a TLS-enabled ZooKeeper quorum
|
||||
# Test both create() and query(), each of which leverages the ZooKeeper command line
|
||||
# This tests the code in org.apache.zookeeper.ZooKeeperMainWithTlsSupportForKafka
|
||||
path="/foo"
|
||||
value="{\"bar\": 0}"
|
||||
self.zk.create(path, value=value)
|
||||
if self.zk.query(path) != value:
|
||||
raise Exception("Error creating and then querying a znode using the CLI with a TLS-enabled ZooKeeper quorum")
|
||||
|
||||
# Make sure the ConfigCommand CLI is able to talk to a TLS-enabled ZooKeeper quorum
|
||||
# This is necessary for the bootstrap use case despite direct ZooKeeper connectivity being deprecated
|
||||
self.zk.describeUsers()
|
||||
|
||||
# Make sure the AclCommand CLI is able to talk to a TLS-enabled ZooKeeper quorum
|
||||
# This is necessary for the bootstrap use case despite direct ZooKeeper connectivity being deprecated
|
||||
self.zk.list_acls(self.topic)
|
||||
|
||||
#
|
||||
# Test zookeeper.set.acl with just TLS mutual authentication (no SASL)
|
||||
#
|
||||
# Step 1: run migration tool
|
||||
self.zk.zookeeper_migration(self.zk.nodes[0], "secure")
|
||||
# Step 2: restart brokers with zookeeper.set.acl=true and acls (with TLS but no SASL)
|
||||
self.kafka.zk_set_acl = True
|
||||
self.kafka.restart_cluster()
|
||||
self.perform_produce_consume_validation()
|
||||
|
||||
#
|
||||
# Test zookeeper.set.acl with both SASL and TLS mutual authentication
|
||||
#
|
||||
# Step 1: remove ACLs created previously
|
||||
self.kafka.zk_set_acl = False
|
||||
self.kafka.restart_cluster()
|
||||
self.zk.zookeeper_migration(self.zk.nodes[0], "unsecure")
|
||||
# Step 2: enable ZooKeeper SASL authentication, but don't take advantage of it in Kafka yet
|
||||
self.zk.zk_sasl = True
|
||||
self.kafka.start_minikdc_if_necessary(self.zk.zk_principals)
|
||||
self.zk.restart_cluster()
|
||||
# bounce a Kafka broker -- allows us to detect a broker restart failure as a simple sanity check
|
||||
self.kafka.stop_node(self.kafka.nodes[0])
|
||||
self.kafka.start_node(self.kafka.nodes[0])
|
||||
# Step 3: run migration tool
|
||||
self.zk.zookeeper_migration(self.zk.nodes[0], "secure")
|
||||
# Step 4: restart brokers with zookeeper.set.acl=true and acls (with both TLS and SASL)
|
||||
self.kafka.zk_set_acl = True
|
||||
self.kafka.restart_cluster()
|
||||
self.perform_produce_consume_validation()
|
Loading…
Reference in New Issue