mirror of https://github.com/apache/kafka.git
MINOR: Migrate connect system tests to KRaft (#12621)
Adds the `metadata_quorum` parameter to the `@matrix(...)` annotation to many existing tests, so that they are run with both zookeeper and remote_kraft nodes. Reviewers: Randall Hauch <rhauch@gmail.com>, Greg Harris <gharris1727@gmail.com>
This commit is contained in:
parent
47adb86636
commit
57aefa9c82
|
@ -20,7 +20,7 @@ from ducktape.mark import matrix, parametrize
|
|||
from ducktape.cluster.remoteaccount import RemoteCommandError
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService, config_property
|
||||
from kafkatest.services.kafka import KafkaService, config_property, quorum
|
||||
from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError, MockSink, MockSource
|
||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
from kafkatest.services.security.security_config import SecurityConfig
|
||||
|
@ -75,7 +75,7 @@ class ConnectDistributedTest(Test):
|
|||
self.TOPIC: {'partitions': 1, 'replication-factor': 1}
|
||||
}
|
||||
|
||||
self.zk = ZookeeperService(test_context, self.num_zk)
|
||||
self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None
|
||||
|
||||
self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
|
||||
self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
|
||||
|
@ -98,7 +98,8 @@ class ConnectDistributedTest(Test):
|
|||
include_filestream_connectors=include_filestream_connectors)
|
||||
self.cc.log_level = "DEBUG"
|
||||
|
||||
self.zk.start()
|
||||
if self.zk:
|
||||
self.zk.start()
|
||||
self.kafka.start()
|
||||
|
||||
def _start_connector(self, config_file):
|
||||
|
@ -164,8 +165,8 @@ class ConnectDistributedTest(Test):
|
|||
return self._task_has_state(task_id, status, 'RUNNING')
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
@matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_restart_failed_connector(self, exactly_once_source, connect_protocol):
|
||||
@matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_restart_failed_connector(self, exactly_once_source, connect_protocol, metadata_quorum):
|
||||
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled'
|
||||
self.CONNECT_PROTOCOL = connect_protocol
|
||||
self.setup_services()
|
||||
|
@ -187,8 +188,8 @@ class ConnectDistributedTest(Test):
|
|||
err_msg="Failed to see connector transition to the RUNNING state")
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
@matrix(connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_restart_failed_task(self, connector_type, connect_protocol):
|
||||
@matrix(connector_type=['source', 'exactly-once source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_restart_failed_task(self, connector_type, connect_protocol, metadata_quorum):
|
||||
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type == 'exactly-once source' else 'disabled'
|
||||
self.CONNECT_PROTOCOL = connect_protocol
|
||||
self.setup_services()
|
||||
|
@ -213,8 +214,8 @@ class ConnectDistributedTest(Test):
|
|||
err_msg="Failed to see task transition to the RUNNING state")
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
@matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_restart_connector_and_tasks_failed_connector(self, connect_protocol):
|
||||
@matrix(connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_restart_connector_and_tasks_failed_connector(self, connect_protocol, metadata_quorum):
|
||||
self.CONNECT_PROTOCOL = connect_protocol
|
||||
self.setup_services()
|
||||
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
|
||||
|
@ -232,8 +233,8 @@ class ConnectDistributedTest(Test):
|
|||
err_msg="Failed to see connector transition to the RUNNING state")
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
@matrix(connector_type=['source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_protocol):
|
||||
@matrix(connector_type=['source', 'sink'], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_protocol, metadata_quorum):
|
||||
self.CONNECT_PROTOCOL = connect_protocol
|
||||
self.setup_services()
|
||||
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
|
||||
|
@ -257,8 +258,8 @@ class ConnectDistributedTest(Test):
|
|||
err_msg="Failed to see task transition to the RUNNING state")
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
@matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_pause_and_resume_source(self, exactly_once_source, connect_protocol):
|
||||
@matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_pause_and_resume_source(self, exactly_once_source, connect_protocol, metadata_quorum):
|
||||
"""
|
||||
Verify that source connectors stop producing records when paused and begin again after
|
||||
being resumed.
|
||||
|
@ -299,8 +300,8 @@ class ConnectDistributedTest(Test):
|
|||
err_msg="Failed to produce messages after resuming source connector")
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
@matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_pause_and_resume_sink(self, connect_protocol):
|
||||
@matrix(connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum):
|
||||
"""
|
||||
Verify that sink connectors stop consuming records when paused and begin again after
|
||||
being resumed.
|
||||
|
@ -347,8 +348,8 @@ class ConnectDistributedTest(Test):
|
|||
err_msg="Failed to consume messages after resuming sink connector")
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
@matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_pause_state_persistent(self, exactly_once_source, connect_protocol):
|
||||
@matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_pause_state_persistent(self, exactly_once_source, connect_protocol, metadata_quorum):
|
||||
"""
|
||||
Verify that paused state is preserved after a cluster restart.
|
||||
"""
|
||||
|
@ -375,8 +376,8 @@ class ConnectDistributedTest(Test):
|
|||
err_msg="Failed to see connector startup in PAUSED state")
|
||||
|
||||
@cluster(num_nodes=6)
|
||||
@matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol):
|
||||
@matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL], exactly_once_source=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol, metadata_quorum):
|
||||
"""
|
||||
Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
|
||||
correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
|
||||
|
@ -409,8 +410,8 @@ class ConnectDistributedTest(Test):
|
|||
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=150, err_msg="Sink output file never converged to the same state as the input file")
|
||||
|
||||
@cluster(num_nodes=6)
|
||||
@matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_bounce(self, clean, connect_protocol):
|
||||
@matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_bounce(self, clean, connect_protocol, metadata_quorum):
|
||||
"""
|
||||
Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
|
||||
run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces,
|
||||
|
@ -537,8 +538,8 @@ class ConnectDistributedTest(Test):
|
|||
assert success, "Found validation errors:\n" + "\n ".join(errors)
|
||||
|
||||
@cluster(num_nodes=6)
|
||||
@matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_exactly_once_source(self, clean, connect_protocol):
|
||||
@matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_exactly_once_source(self, clean, connect_protocol, metadata_quorum):
|
||||
"""
|
||||
Validates that source tasks run correctly and deliver messages exactly once
|
||||
when Kafka Connect workers undergo bounces, both clean and unclean.
|
||||
|
@ -641,8 +642,8 @@ class ConnectDistributedTest(Test):
|
|||
assert success, "Found validation errors:\n" + "\n ".join(errors)
|
||||
|
||||
@cluster(num_nodes=6)
|
||||
@matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
|
||||
def test_transformations(self, connect_protocol):
|
||||
@matrix(connect_protocol=['sessioned', 'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_transformations(self, connect_protocol, metadata_quorum):
|
||||
self.CONNECT_PROTOCOL = connect_protocol
|
||||
self.setup_services(timestamp_type='CreateTime', include_filestream_connectors=True)
|
||||
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
from kafkatest.tests.kafka_test import KafkaTest
|
||||
from kafkatest.services.connect import ConnectDistributedService, ConnectRestError, ConnectServiceBase
|
||||
from kafkatest.services.kafka import quorum
|
||||
from ducktape.utils.util import wait_until
|
||||
from ducktape.mark import matrix
|
||||
from ducktape.mark.resource import cluster
|
||||
|
@ -78,8 +79,8 @@ class ConnectRestApiTest(KafkaTest):
|
|||
include_filestream_connectors=True)
|
||||
|
||||
@cluster(num_nodes=4)
|
||||
@matrix(connect_protocol=['compatible', 'eager'])
|
||||
def test_rest_api(self, connect_protocol):
|
||||
@matrix(connect_protocol=['compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_rest_api(self, connect_protocol, metadata_quorum):
|
||||
# Template parameters
|
||||
self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
|
||||
self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
|
||||
|
|
|
@ -138,9 +138,11 @@ class ConnectStandaloneFileTest(Test):
|
|||
return False
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
@parametrize(error_tolerance=ErrorTolerance.ALL)
|
||||
@parametrize(error_tolerance=ErrorTolerance.NONE)
|
||||
def test_skip_and_log_to_dlq(self, error_tolerance):
|
||||
@parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.zk)
|
||||
@parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.remote_kraft)
|
||||
@parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.remote_kraft)
|
||||
@parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.zk)
|
||||
def test_skip_and_log_to_dlq(self, error_tolerance, metadata_quorum):
|
||||
self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, topics=self.topics)
|
||||
|
||||
# set config props
|
||||
|
@ -171,7 +173,8 @@ class ConnectStandaloneFileTest(Test):
|
|||
self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE],
|
||||
include_filestream_connectors=True)
|
||||
|
||||
self.zk.start()
|
||||
if self.zk:
|
||||
self.zk.start()
|
||||
self.kafka.start()
|
||||
|
||||
self.override_key_converter = "org.apache.kafka.connect.storage.StringConverter"
|
||||
|
|
Loading…
Reference in New Issue