KAFKA-18678 Update TestVerifiableProducer system test (#18768)

Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-02-03 14:14:54 +08:00 committed by GitHub
parent 9d6faf0283
commit 5268fcdc98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 5 additions and 11 deletions

View File

@ -21,7 +21,6 @@ from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.utils import is_version
from kafkatest.version import DEV_BRANCH, KafkaVersion
@ -32,25 +31,20 @@ class TestVerifiableProducer(Test):
super(TestVerifiableProducer, self).__init__(test_context)
self.topic = "topic"
self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
self.kafka = KafkaService(test_context, num_nodes=1, zk=None,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.num_messages = 1000
# This will produce to source kafka cluster
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
max_messages=self.num_messages, throughput=self.num_messages // 10)
def setUp(self):
if self.zk:
self.zk.start()
@cluster(num_nodes=3)
@matrix(producer_version=[str(DEV_BRANCH)], acks=["0", "1", "-1"], enable_idempotence=[False])
@matrix(producer_version=[str(DEV_BRANCH)], acks=["-1"], enable_idempotence=[True])
@matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all)
@cluster(num_nodes=4)
@matrix(producer_version=[str(DEV_BRANCH)], acks=["0", "1", "-1"], enable_idempotence=[False], metadata_quorum=quorum.all_kraft)
@matrix(producer_version=[str(DEV_BRANCH)], acks=["-1"], enable_idempotence=[True], metadata_quorum=quorum.all_kraft)
@matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_kraft)
@matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'GSSAPI'],
metadata_quorum=quorum.all)
metadata_quorum=quorum.all_kraft)
def test_simple_run(self, producer_version, acks=None, enable_idempotence=False, security_protocol = 'PLAINTEXT',
sasl_mechanism='PLAIN', metadata_quorum=quorum.zk):
"""