mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-17915: Convert Kafka Client system tests to use KRaft (#17669)
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
		
							parent
							
								
									39069bbad2
								
							
						
					
					
						commit
						bc782d0fb7
					
				|  | @ -84,7 +84,7 @@ class OffsetValidationTest(VerifiableConsumerTest): | ||||||
|         use_new_coordinator=[True], |         use_new_coordinator=[True], | ||||||
|         group_protocol=consumer_group.all_group_protocols |         group_protocol=consumer_group.all_group_protocols | ||||||
|     ) |     ) | ||||||
|     def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): |     def test_broker_rolling_bounce(self, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): | ||||||
|         """ |         """ | ||||||
|         Verify correct consumer behavior when the brokers are consecutively restarted. |         Verify correct consumer behavior when the brokers are consecutively restarted. | ||||||
| 
 | 
 | ||||||
|  | @ -143,7 +143,7 @@ class OffsetValidationTest(VerifiableConsumerTest): | ||||||
|         use_new_coordinator=[True], |         use_new_coordinator=[True], | ||||||
|         group_protocol=consumer_group.all_group_protocols |         group_protocol=consumer_group.all_group_protocols | ||||||
|     ) |     ) | ||||||
|     def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): |     def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): | ||||||
|         """ |         """ | ||||||
|         Verify correct consumer behavior when the consumers in the group are consecutively restarted. |         Verify correct consumer behavior when the consumers in the group are consecutively restarted. | ||||||
| 
 | 
 | ||||||
|  | @ -202,7 +202,7 @@ class OffsetValidationTest(VerifiableConsumerTest): | ||||||
|         use_new_coordinator=[True], |         use_new_coordinator=[True], | ||||||
|         group_protocol=[consumer_group.classic_group_protocol] |         group_protocol=[consumer_group.classic_group_protocol] | ||||||
|     ) |     ) | ||||||
|     def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): |     def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): | ||||||
|         """ |         """ | ||||||
|         Verify correct static consumer behavior when the consumers in the group are restarted. In order to make  |         Verify correct static consumer behavior when the consumers in the group are restarted. In order to make  | ||||||
|         sure the behavior of static members are different from dynamic ones, we take both static and dynamic |         sure the behavior of static members are different from dynamic ones, we take both static and dynamic | ||||||
|  | @ -275,7 +275,7 @@ class OffsetValidationTest(VerifiableConsumerTest): | ||||||
|         use_new_coordinator=[True], |         use_new_coordinator=[True], | ||||||
|         group_protocol=consumer_group.all_group_protocols |         group_protocol=consumer_group.all_group_protocols | ||||||
|     ) |     ) | ||||||
|     def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): |     def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): | ||||||
|         """ |         """ | ||||||
|         Verify that the updated member.id(updated_member_id) caused by static member rejoin would be persisted. If not, |         Verify that the updated member.id(updated_member_id) caused by static member rejoin would be persisted. If not, | ||||||
|         after the brokers rolling bounce, the migrated group coordinator would load the stale persisted member.id and |         after the brokers rolling bounce, the migrated group coordinator would load the stale persisted member.id and | ||||||
|  | @ -317,7 +317,7 @@ class OffsetValidationTest(VerifiableConsumerTest): | ||||||
|         use_new_coordinator=[True], |         use_new_coordinator=[True], | ||||||
|         group_protocol=consumer_group.all_group_protocols |         group_protocol=consumer_group.all_group_protocols | ||||||
|     ) |     ) | ||||||
|     def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): |     def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): | ||||||
|         """ |         """ | ||||||
|         Verify correct static consumer behavior when there are conflicting consumers with same group.instance.id. |         Verify correct static consumer behavior when there are conflicting consumers with same group.instance.id. | ||||||
| 
 | 
 | ||||||
|  | @ -401,7 +401,7 @@ class OffsetValidationTest(VerifiableConsumerTest): | ||||||
|         use_new_coordinator=[True], |         use_new_coordinator=[True], | ||||||
|         group_protocol=consumer_group.all_group_protocols |         group_protocol=consumer_group.all_group_protocols | ||||||
|     ) |     ) | ||||||
|     def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): |     def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): | ||||||
|         partition = TopicPartition(self.TOPIC, 0) |         partition = TopicPartition(self.TOPIC, 0) | ||||||
| 
 | 
 | ||||||
|         consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol) |         consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol) | ||||||
|  | @ -459,7 +459,7 @@ class OffsetValidationTest(VerifiableConsumerTest): | ||||||
|         use_new_coordinator=[True], |         use_new_coordinator=[True], | ||||||
|         group_protocol=consumer_group.all_group_protocols |         group_protocol=consumer_group.all_group_protocols | ||||||
|     ) |     ) | ||||||
|     def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): |     def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): | ||||||
|         partition = TopicPartition(self.TOPIC, 0) |         partition = TopicPartition(self.TOPIC, 0) | ||||||
| 
 | 
 | ||||||
|         consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol) |         consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol) | ||||||
|  | @ -505,7 +505,7 @@ class OffsetValidationTest(VerifiableConsumerTest): | ||||||
|         use_new_coordinator=[True], |         use_new_coordinator=[True], | ||||||
|         group_protocol=consumer_group.all_group_protocols |         group_protocol=consumer_group.all_group_protocols | ||||||
|     ) |     ) | ||||||
|     def test_group_consumption(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): |     def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): | ||||||
|         """ |         """ | ||||||
|         Verifies correct group rebalance behavior as consumers are started and stopped. |         Verifies correct group rebalance behavior as consumers are started and stopped. | ||||||
|         In particular, this test verifies that the partition is readable after every |         In particular, this test verifies that the partition is readable after every | ||||||
|  | @ -570,7 +570,7 @@ class AssignmentValidationTest(VerifiableConsumerTest): | ||||||
|                              "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"], |                              "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"], | ||||||
|         metadata_quorum=[quorum.isolated_kraft], |         metadata_quorum=[quorum.isolated_kraft], | ||||||
|         use_new_coordinator=[True], |         use_new_coordinator=[True], | ||||||
|         group_protocol=[consumer_group.classic_group_protocol], |         group_protocol=[consumer_group.classic_group_protocol] | ||||||
|     ) |     ) | ||||||
|     @matrix( |     @matrix( | ||||||
|         metadata_quorum=[quorum.isolated_kraft], |         metadata_quorum=[quorum.isolated_kraft], | ||||||
|  | @ -578,7 +578,7 @@ class AssignmentValidationTest(VerifiableConsumerTest): | ||||||
|         group_protocol=[consumer_group.consumer_group_protocol], |         group_protocol=[consumer_group.consumer_group_protocol], | ||||||
|         group_remote_assignor=consumer_group.all_remote_assignors |         group_remote_assignor=consumer_group.all_remote_assignors | ||||||
|     ) |     ) | ||||||
|     def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None, group_remote_assignor=None): |     def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None, group_remote_assignor=None): | ||||||
|         """ |         """ | ||||||
|         Verify assignment strategy correctness: each partition is assigned to exactly |         Verify assignment strategy correctness: each partition is assigned to exactly | ||||||
|         one consumer instance. |         one consumer instance. | ||||||
|  |  | ||||||
|  | @ -634,15 +634,15 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         String groupProtocol = res.getString("groupProtocol"); |         GroupProtocol groupProtocol = GroupProtocol.of(res.getString("groupProtocol")); | ||||||
|  |         consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); | ||||||
| 
 | 
 | ||||||
|         // 3.7.0 includes support for KIP-848 which introduced a new implementation of the consumer group protocol. |         // 3.7.0 includes support for KIP-848 which introduced a new implementation of the consumer group protocol. | ||||||
|         // The two implementations use slightly different configuration, hence these arguments are conditional. |         // The two implementations use slightly different configuration, hence these arguments are conditional. | ||||||
|         // |         // | ||||||
|         // See the Python class/method VerifiableConsumer.start_cmd() in verifiable_consumer.py for how the |         // See the Python class/method VerifiableConsumer.start_cmd() in verifiable_consumer.py for how the | ||||||
|         // command line arguments are passed in by the system test framework. |         // command line arguments are passed in by the system test framework. | ||||||
|         if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name())) { |         if (groupProtocol == GroupProtocol.CONSUMER) { | ||||||
|             consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); |  | ||||||
|             String groupRemoteAssignor = res.getString("groupRemoteAssignor"); |             String groupRemoteAssignor = res.getString("groupRemoteAssignor"); | ||||||
| 
 | 
 | ||||||
|             if (groupRemoteAssignor != null) |             if (groupRemoteAssignor != null) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue