KAFKA-16276: Update transactions_test.py to support KIP-848’s group protocol config (#15567)

Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Kirk True 2024-03-22 06:58:40 -07:00 committed by GitHub
parent f66610095c
commit 159d25a7df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 22 additions and 11 deletions

View File

@ -14,7 +14,7 @@
# limitations under the License.
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.kafka import KafkaService, quorum, consumer_group
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.transactional_message_copier import TransactionalMessageCopier
@ -85,8 +85,8 @@ class TransactionsTest(Test):
(self.num_seed_messages, seed_timeout_sec))
return seed_producer.acked
def get_messages_from_topic(self, topic, num_messages):
consumer = self.start_consumer(topic, group_id="verifying_consumer")
def get_messages_from_topic(self, topic, num_messages, group_protocol):
consumer = self.start_consumer(topic, group_id="verifying_consumer", group_protocol=group_protocol)
return self.drain_consumer(consumer, num_messages)
def bounce_brokers(self, clean_shutdown):
@ -154,7 +154,7 @@ class TransactionsTest(Test):
))
return copiers
def start_consumer(self, topic_to_read, group_id):
def start_consumer(self, topic_to_read, group_id, group_protocol):
consumer = ConsoleConsumer(context=self.test_context,
num_nodes=1,
kafka=self.kafka,
@ -162,7 +162,8 @@ class TransactionsTest(Test):
group_id=group_id,
message_validator=is_int,
from_beginning=True,
isolation_level="read_committed")
isolation_level="read_committed",
consumer_properties=consumer_group.maybe_set_group_protocol(group_protocol))
consumer.start()
# ensure that the consumer is up.
wait_until(lambda: (len(consumer.messages_consumed[1]) > 0) == True,
@ -189,7 +190,7 @@ class TransactionsTest(Test):
def copy_messages_transactionally(self, failure_mode, bounce_target,
input_topic, output_topic,
num_copiers, num_messages_to_copy,
use_group_metadata):
use_group_metadata, group_protocol):
"""Copies messages transactionally from the seeded input topic to the
output topic, either bouncing brokers or clients in a hard and soft
way as it goes.
@ -204,7 +205,8 @@ class TransactionsTest(Test):
num_copiers=num_copiers,
use_group_metadata=use_group_metadata)
concurrent_consumer = self.start_consumer(output_topic,
group_id="concurrent_consumer")
group_id="concurrent_consumer",
group_protocol=group_protocol)
clean_shutdown = False
if failure_mode == "clean_bounce":
clean_shutdown = True
@ -257,9 +259,18 @@ class TransactionsTest(Test):
check_order=[True, False],
use_group_metadata=[True, False],
metadata_quorum=quorum.all_kraft,
use_new_coordinator=[True, False]
use_new_coordinator=[False]
)
def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk, use_new_coordinator=False):
@matrix(
failure_mode=["hard_bounce", "clean_bounce"],
bounce_target=["brokers", "clients"],
check_order=[True, False],
use_group_metadata=[True, False],
metadata_quorum=quorum.all_kraft,
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
@ -283,8 +294,8 @@ class TransactionsTest(Test):
concurrently_consumed_messages = self.copy_messages_transactionally(
failure_mode, bounce_target, input_topic=self.input_topic,
output_topic=self.output_topic, num_copiers=self.num_input_partitions,
num_messages_to_copy=self.num_seed_messages, use_group_metadata=use_group_metadata)
output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages)
num_messages_to_copy=self.num_seed_messages, use_group_metadata=use_group_metadata, group_protocol=group_protocol)
output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages, group_protocol)
concurrently_consumed_message_set = set(concurrently_consumed_messages)
output_message_set = set(output_messages)