mirror of https://github.com/apache/kafka.git
KAFKA-16440: Update security_test.py to support KIP-848’s group protocol config (#15628)
Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Walker Carlson <wcarlson@apache.org>
This commit is contained in:
parent
6569a354e6
commit
6bb9caced0
|
@ -19,7 +19,7 @@ from ducktape.mark.resource import cluster
|
||||||
from ducktape.utils.util import wait_until
|
from ducktape.utils.util import wait_until
|
||||||
from ducktape.errors import TimeoutError
|
from ducktape.errors import TimeoutError
|
||||||
|
|
||||||
from kafkatest.services.kafka import quorum
|
from kafkatest.services.kafka import quorum, consumer_group
|
||||||
from kafkatest.services.security.security_config import SecurityConfig
|
from kafkatest.services.security.security_config import SecurityConfig
|
||||||
from kafkatest.services.security.security_config import SslStores
|
from kafkatest.services.security.security_config import SslStores
|
||||||
from kafkatest.tests.end_to_end import EndToEndTest
|
from kafkatest.tests.end_to_end import EndToEndTest
|
||||||
|
@ -61,28 +61,30 @@ class SecurityTest(EndToEndTest):
|
||||||
@matrix(
|
@matrix(
|
||||||
security_protocol=['PLAINTEXT'],
|
security_protocol=['PLAINTEXT'],
|
||||||
interbroker_security_protocol=['SSL'],
|
interbroker_security_protocol=['SSL'],
|
||||||
metadata_quorum=[quorum.zk],
|
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
security_protocol=['PLAINTEXT'],
|
security_protocol=['PLAINTEXT'],
|
||||||
interbroker_security_protocol=['SSL'],
|
interbroker_security_protocol=['SSL'],
|
||||||
metadata_quorum=[quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[True, False]
|
use_new_coordinator=[True],
|
||||||
|
group_protocol=consumer_group.all_group_protocols
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
security_protocol=['SSL'],
|
security_protocol=['SSL'],
|
||||||
interbroker_security_protocol=['PLAINTEXT'],
|
interbroker_security_protocol=['PLAINTEXT'],
|
||||||
metadata_quorum=[quorum.zk],
|
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
security_protocol=['SSL'],
|
security_protocol=['SSL'],
|
||||||
interbroker_security_protocol=['PLAINTEXT'],
|
interbroker_security_protocol=['PLAINTEXT'],
|
||||||
metadata_quorum=[quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[True, False]
|
use_new_coordinator=[True],
|
||||||
|
group_protocol=consumer_group.all_group_protocols
|
||||||
)
|
)
|
||||||
def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol, metadata_quorum=quorum.zk, use_new_coordinator=False):
|
def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
|
||||||
"""
|
"""
|
||||||
Test that invalid hostname in certificate results in connection failures.
|
Test that invalid hostname in certificate results in connection failures.
|
||||||
When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure.
|
When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure.
|
||||||
|
@ -120,11 +122,11 @@ class SecurityTest(EndToEndTest):
|
||||||
# the inter-broker security protocol using TLS with a hostname verification failure
|
# the inter-broker security protocol using TLS with a hostname verification failure
|
||||||
# doesn't impact a producer in case of a single broker with a KRaft Controller,
|
# doesn't impact a producer in case of a single broker with a KRaft Controller,
|
||||||
# so confirm that this is in fact the observed behavior
|
# so confirm that this is in fact the observed behavior
|
||||||
self.create_and_start_clients(log_level="INFO")
|
self.create_and_start_clients(log_level="INFO", group_protocol=group_protocol)
|
||||||
self.run_validation()
|
self.run_validation()
|
||||||
else:
|
else:
|
||||||
# We need more verbose logging to catch the expected errors
|
# We need more verbose logging to catch the expected errors
|
||||||
self.create_and_start_clients(log_level="DEBUG")
|
self.create_and_start_clients(log_level="DEBUG", group_protocol=group_protocol)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30)
|
wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30)
|
||||||
|
@ -143,26 +145,27 @@ class SecurityTest(EndToEndTest):
|
||||||
|
|
||||||
SecurityConfig.ssl_stores.valid_hostname = True
|
SecurityConfig.ssl_stores.valid_hostname = True
|
||||||
self.kafka.restart_cluster()
|
self.kafka.restart_cluster()
|
||||||
self.create_and_start_clients(log_level="INFO")
|
self.create_and_start_clients(log_level="INFO", group_protocol=group_protocol)
|
||||||
self.run_validation()
|
self.run_validation()
|
||||||
|
|
||||||
def create_and_start_clients(self, log_level):
|
def create_and_start_clients(self, log_level, group_protocol):
|
||||||
self.create_producer(log_level=log_level)
|
self.create_producer(log_level=log_level)
|
||||||
self.producer.start()
|
self.producer.start()
|
||||||
|
|
||||||
self.create_consumer(log_level=log_level)
|
self.create_consumer(log_level=log_level, group_protocol=group_protocol)
|
||||||
self.consumer.start()
|
self.consumer.start()
|
||||||
|
|
||||||
@cluster(num_nodes=2)
|
@cluster(num_nodes=2)
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.zk],
|
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[True, False]
|
use_new_coordinator=[True],
|
||||||
|
group_protocol=consumer_group.all_group_protocols
|
||||||
)
|
)
|
||||||
def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
|
def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
|
||||||
"""
|
"""
|
||||||
Test that invalid hostname in ZooKeeper or KRaft Controller results in broker inability to start.
|
Test that invalid hostname in ZooKeeper or KRaft Controller results in broker inability to start.
|
||||||
"""
|
"""
|
||||||
|
|
Loading…
Reference in New Issue