mirror of https://github.com/apache/kafka.git
KAFKA-17915: Convert Kafka Client system tests to use KRaft (#17669)
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
d7e5d0a59b
commit
45e3c21e9a
|
@ -84,7 +84,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
|||
use_new_coordinator=[True],
|
||||
group_protocol=consumer_group.all_group_protocols
|
||||
)
|
||||
def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
|
||||
def test_broker_rolling_bounce(self, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
|
||||
"""
|
||||
Verify correct consumer behavior when the brokers are consecutively restarted.
|
||||
|
||||
|
@ -143,7 +143,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
|||
use_new_coordinator=[True],
|
||||
group_protocol=consumer_group.all_group_protocols
|
||||
)
|
||||
def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
|
||||
def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
|
||||
"""
|
||||
Verify correct consumer behavior when the consumers in the group are consecutively restarted.
|
||||
|
||||
|
@ -202,7 +202,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
|||
use_new_coordinator=[True],
|
||||
group_protocol=[consumer_group.classic_group_protocol]
|
||||
)
|
||||
def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
|
||||
def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
|
||||
"""
|
||||
Verify correct static consumer behavior when the consumers in the group are restarted. In order to make
|
||||
sure the behavior of static members are different from dynamic ones, we take both static and dynamic
|
||||
|
@ -275,7 +275,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
|||
use_new_coordinator=[True],
|
||||
group_protocol=consumer_group.all_group_protocols
|
||||
)
|
||||
def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
|
||||
def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
|
||||
"""
|
||||
Verify that the updated member.id(updated_member_id) caused by static member rejoin would be persisted. If not,
|
||||
after the brokers rolling bounce, the migrated group coordinator would load the stale persisted member.id and
|
||||
|
@ -317,7 +317,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
|||
use_new_coordinator=[True],
|
||||
group_protocol=consumer_group.all_group_protocols
|
||||
)
|
||||
def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
|
||||
def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
|
||||
"""
|
||||
Verify correct static consumer behavior when there are conflicting consumers with same group.instance.id.
|
||||
|
||||
|
@ -401,7 +401,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
|||
use_new_coordinator=[True],
|
||||
group_protocol=consumer_group.all_group_protocols
|
||||
)
|
||||
def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
|
||||
def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
|
||||
partition = TopicPartition(self.TOPIC, 0)
|
||||
|
||||
consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol)
|
||||
|
@ -459,7 +459,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
|||
use_new_coordinator=[True],
|
||||
group_protocol=consumer_group.all_group_protocols
|
||||
)
|
||||
def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
|
||||
def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
|
||||
partition = TopicPartition(self.TOPIC, 0)
|
||||
|
||||
consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol)
|
||||
|
@ -505,7 +505,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
|||
use_new_coordinator=[True],
|
||||
group_protocol=consumer_group.all_group_protocols
|
||||
)
|
||||
def test_group_consumption(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
|
||||
def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
|
||||
"""
|
||||
Verifies correct group rebalance behavior as consumers are started and stopped.
|
||||
In particular, this test verifies that the partition is readable after every
|
||||
|
@ -570,7 +570,7 @@ class AssignmentValidationTest(VerifiableConsumerTest):
|
|||
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"],
|
||||
metadata_quorum=[quorum.isolated_kraft],
|
||||
use_new_coordinator=[True],
|
||||
group_protocol=[consumer_group.classic_group_protocol],
|
||||
group_protocol=[consumer_group.classic_group_protocol]
|
||||
)
|
||||
@matrix(
|
||||
metadata_quorum=[quorum.isolated_kraft],
|
||||
|
@ -578,7 +578,7 @@ class AssignmentValidationTest(VerifiableConsumerTest):
|
|||
group_protocol=[consumer_group.consumer_group_protocol],
|
||||
group_remote_assignor=consumer_group.all_remote_assignors
|
||||
)
|
||||
def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None, group_remote_assignor=None):
|
||||
def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None, group_remote_assignor=None):
|
||||
"""
|
||||
Verify assignment strategy correctness: each partition is assigned to exactly
|
||||
one consumer instance.
|
||||
|
|
|
@ -634,15 +634,15 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
|
|||
}
|
||||
}
|
||||
|
||||
String groupProtocol = res.getString("groupProtocol");
|
||||
GroupProtocol groupProtocol = GroupProtocol.of(res.getString("groupProtocol"));
|
||||
consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
|
||||
|
||||
// 3.7.0 includes support for KIP-848 which introduced a new implementation of the consumer group protocol.
|
||||
// The two implementations use slightly different configuration, hence these arguments are conditional.
|
||||
//
|
||||
// See the Python class/method VerifiableConsumer.start_cmd() in verifiable_consumer.py for how the
|
||||
// command line arguments are passed in by the system test framework.
|
||||
if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
|
||||
consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
|
||||
if (groupProtocol == GroupProtocol.CONSUMER) {
|
||||
String groupRemoteAssignor = res.getString("groupRemoteAssignor");
|
||||
|
||||
if (groupRemoteAssignor != null)
|
||||
|
|
Loading…
Reference in New Issue