mirror of https://github.com/apache/kafka.git
KAFKA-16439: Update replication_replica_failure_test.py to support KIP-848’s group protocol config (#15629)
Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Walker Carlson <wcarlson@apache.org>
This commit is contained in:
parent
6bb9caced0
commit
c7ef80bb6c
|
@ -20,7 +20,7 @@ from ducktape.mark import matrix
|
||||||
from ducktape.mark import parametrize
|
from ducktape.mark import parametrize
|
||||||
from ducktape.mark.resource import cluster
|
from ducktape.mark.resource import cluster
|
||||||
|
|
||||||
from kafkatest.services.kafka import quorum
|
from kafkatest.services.kafka import quorum, consumer_group
|
||||||
from kafkatest.tests.end_to_end import EndToEndTest
|
from kafkatest.tests.end_to_end import EndToEndTest
|
||||||
from kafkatest.services.kafka import config_property
|
from kafkatest.services.kafka import config_property
|
||||||
from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec
|
from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec
|
||||||
|
@ -38,14 +38,15 @@ class ReplicationReplicaFailureTest(EndToEndTest):
|
||||||
|
|
||||||
@cluster(num_nodes=7)
|
@cluster(num_nodes=7)
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.zk],
|
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[True, False]
|
use_new_coordinator=[True],
|
||||||
|
group_protocol=consumer_group.all_group_protocols
|
||||||
)
|
)
|
||||||
def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
|
def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
|
||||||
"""
|
"""
|
||||||
This test verifies that replication shrinks the ISR when a replica is not fetching anymore.
|
This test verifies that replication shrinks the ISR when a replica is not fetching anymore.
|
||||||
It also verifies that replication provides simple durability guarantees by checking that data acked by
|
It also verifies that replication provides simple durability guarantees by checking that data acked by
|
||||||
|
@ -90,7 +91,7 @@ class ReplicationReplicaFailureTest(EndToEndTest):
|
||||||
self.create_producer()
|
self.create_producer()
|
||||||
self.producer.start()
|
self.producer.start()
|
||||||
|
|
||||||
self.create_consumer()
|
self.create_consumer(group_protocol=group_protocol)
|
||||||
self.consumer.start()
|
self.consumer.start()
|
||||||
|
|
||||||
self.await_startup()
|
self.await_startup()
|
||||||
|
|
Loading…
Reference in New Issue