From 159d25a7df25975694e2e0eb18a8feb125f7c39e Mon Sep 17 00:00:00 2001 From: Kirk True Date: Fri, 22 Mar 2024 06:58:40 -0700 Subject: [PATCH] =?UTF-8?q?KAFKA-16276:=20Update=20transactions=5Ftest.py?= =?UTF-8?q?=20to=20support=20KIP-848=E2=80=99s=20group=20protocol=20config?= =?UTF-8?q?=20(#15567)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢 Reviewers: Lucas Brutschy --- .../kafkatest/tests/core/transactions_test.py | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index b9b39f355e4..ae649c37667 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -14,7 +14,7 @@ # limitations under the License. from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.kafka import KafkaService, quorum, consumer_group from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.transactional_message_copier import TransactionalMessageCopier @@ -85,8 +85,8 @@ class TransactionsTest(Test): (self.num_seed_messages, seed_timeout_sec)) return seed_producer.acked - def get_messages_from_topic(self, topic, num_messages): - consumer = self.start_consumer(topic, group_id="verifying_consumer") + def get_messages_from_topic(self, topic, num_messages, group_protocol): + consumer = self.start_consumer(topic, group_id="verifying_consumer", group_protocol=group_protocol) return self.drain_consumer(consumer, num_messages) def bounce_brokers(self, clean_shutdown): @@ -154,7 +154,7 @@ class TransactionsTest(Test): )) return copiers - def start_consumer(self, topic_to_read, group_id): + def start_consumer(self, topic_to_read, group_id, group_protocol): consumer = ConsoleConsumer(context=self.test_context, num_nodes=1, kafka=self.kafka, @@ -162,7 +162,8 @@ class TransactionsTest(Test): group_id=group_id, message_validator=is_int, from_beginning=True, - isolation_level="read_committed") + isolation_level="read_committed", + consumer_properties=consumer_group.maybe_set_group_protocol(group_protocol)) consumer.start() # ensure that the consumer is up. wait_until(lambda: (len(consumer.messages_consumed[1]) > 0) == True, @@ -189,7 +190,7 @@ class TransactionsTest(Test): def copy_messages_transactionally(self, failure_mode, bounce_target, input_topic, output_topic, num_copiers, num_messages_to_copy, - use_group_metadata): + use_group_metadata, group_protocol): """Copies messages transactionally from the seeded input topic to the output topic, either bouncing brokers or clients in a hard and soft way as it goes. @@ -204,7 +205,8 @@ class TransactionsTest(Test): num_copiers=num_copiers, use_group_metadata=use_group_metadata) concurrent_consumer = self.start_consumer(output_topic, - group_id="concurrent_consumer") + group_id="concurrent_consumer", + group_protocol=group_protocol) clean_shutdown = False if failure_mode == "clean_bounce": clean_shutdown = True @@ -257,9 +259,18 @@ class TransactionsTest(Test): check_order=[True, False], use_group_metadata=[True, False], metadata_quorum=quorum.all_kraft, - use_new_coordinator=[True, False] + use_new_coordinator=[False] ) - def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk, use_new_coordinator=False): + @matrix( + failure_mode=["hard_bounce", "clean_bounce"], + bounce_target=["brokers", "clients"], + check_order=[True, False], + use_group_metadata=[True, False], + metadata_quorum=quorum.all_kraft, + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols + ) + def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol @@ -283,8 +294,8 @@ class TransactionsTest(Test): concurrently_consumed_messages = self.copy_messages_transactionally( failure_mode, bounce_target, input_topic=self.input_topic, output_topic=self.output_topic, num_copiers=self.num_input_partitions, - num_messages_to_copy=self.num_seed_messages, use_group_metadata=use_group_metadata) - output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages) + num_messages_to_copy=self.num_seed_messages, use_group_metadata=use_group_metadata, group_protocol=group_protocol) + output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages, group_protocol) concurrently_consumed_message_set = set(concurrently_consumed_messages) output_message_set = set(output_messages)