KAFKA-16272: Adding new coordinator related changes for connect_distributed.py (#15594)

Summary of the changes:

Parameterizes the tests to use new coordinator and pass in consumer group protocol. This would be applicable to sink connectors only.
Enhances the sink connector creation code in system tests to accept a new optional parameter for consumer group protocol to be used.
Sets the consumer group protocol via consumer.override. override config when the new group coordinator is enabled.
Note about testing: There are 288 tests that need to be run and running on my local takes a lot of time. I will try to post the test results once I have a full run.

Reviewers: Kirk True <ktrue@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>, Philip Nee <pnee@confluent.io>
This commit is contained in:
vamossagar12 2024-04-19 20:59:50 +05:30 committed by GitHub
parent b87cd66dab
commit f22ad6645b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 54 additions and 38 deletions

View File

@ -519,12 +519,13 @@ class VerifiableSink(VerifiableConnector):
Helper class for running a verifiable sink connector on a Kafka Connect cluster and analyzing the output.
"""
def __init__(self, cc, name="verifiable-sink", tasks=1, topics=["verifiable"]):
def __init__(self, cc, name="verifiable-sink", tasks=1, topics=["verifiable"], consumer_group_protocol=None):
self.cc = cc
self.logger = self.cc.logger
self.name = name
self.tasks = tasks
self.topics = topics
self.consumer_group_protocol = consumer_group_protocol
def flushed_messages(self):
return list(filter(lambda m: 'flushed' in m and m['flushed'], self.messages()))
@ -534,33 +535,40 @@ class VerifiableSink(VerifiableConnector):
def start(self):
self.logger.info("Creating connector VerifiableSinkConnector %s", self.name)
self.cc.create_connector({
connector_config = {
'name': self.name,
'connector.class': 'org.apache.kafka.connect.tools.VerifiableSinkConnector',
'tasks.max': self.tasks,
'topics': ",".join(self.topics)
})
}
if self.consumer_group_protocol is not None:
connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol
self.cc.create_connector(connector_config)
class MockSink(object):
def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"):
def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", consumer_group_protocol=None):
self.cc = cc
self.logger = self.cc.logger
self.name = name
self.mode = mode
self.delay_sec = delay_sec
self.topics = topics
self.consumer_group_protocol = consumer_group_protocol
def start(self):
self.logger.info("Creating connector MockSinkConnector %s", self.name)
self.cc.create_connector({
connector_config = {
'name': self.name,
'connector.class': 'org.apache.kafka.connect.tools.MockSinkConnector',
'tasks.max': 1,
'topics': ",".join(self.topics),
'mock_mode': self.mode,
'delay_ms': self.delay_sec * 1000
})
}
if self.consumer_group_protocol is not None:
connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol
self.cc.create_connector(connector_config)
class MockSource(object):

View File

@ -108,9 +108,10 @@ class ConnectDistributedTest(Test):
self.zk.start()
self.kafka.start()
def _start_connector(self, config_file):
def _start_connector(self, config_file, extra_config={}):
connector_props = self.render(config_file)
connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
connector_config.update(extra_config)
self.cc.create_connector(connector_config)
def _connector_status(self, connector, node=None):
@ -174,16 +175,17 @@ class ConnectDistributedTest(Test):
@matrix(
exactly_once_source=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.zk],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
exactly_once_source=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False]
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_restart_failed_connector(self, exactly_once_source, connect_protocol, metadata_quorum, use_new_coordinator=False):
def test_restart_failed_connector(self, exactly_once_source, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None):
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled'
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
@ -193,7 +195,7 @@ class ConnectDistributedTest(Test):
if exactly_once_source:
self.connector = MockSource(self.cc, mode='connector-failure', delay_sec=5)
else:
self.connector = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5)
self.connector = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5, consumer_group_protocol=group_protocol)
self.connector.start()
wait_until(lambda: self.connector_is_failed(self.connector), timeout_sec=15,
@ -208,16 +210,17 @@ class ConnectDistributedTest(Test):
@matrix(
connector_type=['source', 'exactly-once source', 'sink'],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.zk],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
connector_type=['source', 'exactly-once source', 'sink'],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False]
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_restart_failed_task(self, connector_type, connect_protocol, metadata_quorum, use_new_coordinator=False):
def test_restart_failed_task(self, connector_type, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None):
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type == 'exactly-once source' else 'disabled'
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
@ -226,7 +229,7 @@ class ConnectDistributedTest(Test):
connector = None
if connector_type == "sink":
connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5)
connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5, consumer_group_protocol=group_protocol)
else:
connector = MockSource(self.cc, mode='task-failure', delay_sec=5)
@ -244,21 +247,22 @@ class ConnectDistributedTest(Test):
@cluster(num_nodes=5)
@matrix(
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.zk],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False]
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_restart_connector_and_tasks_failed_connector(self, connect_protocol, metadata_quorum, use_new_coordinator=False):
def test_restart_connector_and_tasks_failed_connector(self, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None):
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start()
self.sink = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5)
self.sink = MockSink(self.cc, self.topics.keys(), mode='connector-failure', delay_sec=5, consumer_group_protocol=group_protocol)
self.sink.start()
wait_until(lambda: self.connector_is_failed(self.sink), timeout_sec=15,
@ -273,16 +277,17 @@ class ConnectDistributedTest(Test):
@matrix(
connector_type=['source', 'sink'],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.zk],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
connector_type=['source', 'sink'],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False]
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_protocol, metadata_quorum, use_new_coordinator=False):
def test_restart_connector_and_tasks_failed_task(self, connector_type, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None):
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
@ -290,7 +295,7 @@ class ConnectDistributedTest(Test):
connector = None
if connector_type == "sink":
connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5)
connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5, consumer_group_protocol=group_protocol)
else:
connector = MockSource(self.cc, mode='task-failure', delay_sec=5)
@ -361,15 +366,16 @@ class ConnectDistributedTest(Test):
@cluster(num_nodes=5)
@matrix(
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.zk],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False]
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, use_new_coordinator=False):
def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None):
"""
Verify that sink connectors stop consuming records when paused and begin again after
being resumed.
@ -387,7 +393,7 @@ class ConnectDistributedTest(Test):
wait_until(lambda: len(self.source.committed_messages()) > 0, timeout_sec=30,
err_msg="Timeout expired waiting for source task to produce a message")
self.sink = VerifiableSink(self.cc, topics=[self.TOPIC])
self.sink = VerifiableSink(self.cc, topics=[self.TOPIC], consumer_group_protocol=group_protocol)
self.sink.start()
wait_until(lambda: self.is_running(self.sink), timeout_sec=30,
@ -637,7 +643,7 @@ class ConnectDistributedTest(Test):
security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL],
exactly_once_source=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.zk],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
@ -645,9 +651,10 @@ class ConnectDistributedTest(Test):
exactly_once_source=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False]
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol, metadata_quorum, use_new_coordinator=False):
def test_file_source_and_sink(self, security_protocol, exactly_once_source, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None):
"""
Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
@ -662,6 +669,9 @@ class ConnectDistributedTest(Test):
self.logger.info("Creating connectors")
self._start_connector("connect-file-source.properties")
if group_protocol is not None:
self._start_connector("connect-file-sink.properties", {"consumer.override.group.protocol" : group_protocol})
else:
self._start_connector("connect-file-sink.properties")
# Generating data on the source node should generate new records and create new output on the sink node. Timeouts
@ -708,7 +718,7 @@ class ConnectDistributedTest(Test):
self.source = VerifiableSource(self.cc, topic=self.TOPIC, tasks=num_tasks, throughput=100)
self.source.start()
self.sink = VerifiableSink(self.cc, tasks=num_tasks, topics=[self.TOPIC])
self.sink = VerifiableSink(self.cc, tasks=num_tasks, topics=[self.TOPIC], consumer_group_protocol=group_protocol)
self.sink.start()
for i in range(3):
@ -816,17 +826,16 @@ class ConnectDistributedTest(Test):
@matrix(
clean=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
metadata_quorum=[quorum.zk],
use_new_coordinator=[False]
)
@matrix(
clean=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
use_new_coordinator=[True, False]
)
def test_exactly_once_source(self, clean, connect_protocol, metadata_quorum, use_new_coordinator=False, group_protocol=None):
def test_exactly_once_source(self, clean, connect_protocol, metadata_quorum, use_new_coordinator=False):
"""
Validates that source tasks run correctly and deliver messages exactly once
when Kafka Connect workers undergo bounces, both clean and unclean.
@ -881,8 +890,7 @@ class ConnectDistributedTest(Test):
self.source.stop()
self.cc.stop()
consumer_properties = consumer_group.maybe_set_group_protocol(group_protocol)
consumer = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, message_validator=json.loads, consumer_timeout_ms=1000, isolation_level="read_committed", consumer_properties=consumer_properties)
consumer = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, message_validator=json.loads, consumer_timeout_ms=1000, isolation_level="read_committed")
consumer.run()
src_messages = consumer.messages_consumed[1]