MINOR: Fix ZooKeeperAuthorizerTest for KRaft (#11095)

This patch fixes the ZooKeeperAuthorizerTest for KRaft. The system test was not
configuring/reconfiguring/restarting the remote controller quorum with the correct security settings.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Ron Dagostino 2021-07-20 19:35:14 -04:00 committed by GitHub
parent 0314801a8e
commit 1e78dcda69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 13 additions and 0 deletions

View File

@ -55,9 +55,15 @@ class ZooKeeperAuthorizerTest(Test):
broker_security_protocol = "SSL"
broker_principal = "User:CN=systemtest"
client_security_protocol = "SASL_PLAINTEXT"
client_sasl_mechanism = 'GSSAPI'
client_principal = "User:client"
self.kafka.interbroker_security_protocol = broker_security_protocol
self.kafka.security_protocol = client_security_protocol
self.kafka.client_sasl_mechanism = client_sasl_mechanism
if self.kafka.quorum_info.using_kraft:
controller_quorum = self.kafka.controller_quorum
controller_quorum.controller_security_protocol = broker_security_protocol
controller_quorum.intercontroller_security_protocol = broker_security_protocol
self.kafka.start()
# alter client quotas
@ -70,6 +76,13 @@ class ZooKeeperAuthorizerTest(Test):
node.account.ssh(alter_client_quotas_cmd)
# set authorizer, restart with broker as super user
if (metadata_quorum == quorum.remote_kraft):
# we need to explicitly reconfigure/restart any remote controller quorum
self.kafka.logger.info("Restarting Remote KRaft Controller with authorizer and broker principal as super user")
controller_quorum = self.kafka.controller_quorum
controller_quorum.authorizer_class_name = KafkaService.ACL_AUTHORIZER
controller_quorum.server_prop_overrides = [["super.users", broker_principal]] # for broker to work with an authorizer
controller_quorum.restart_cluster()
self.kafka.logger.info("Restarting Kafka with authorizer and broker principal as super user")
self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER
self.kafka.server_prop_overrides = [["super.users", broker_principal]] # for broker to work with an authorizer