mirror of https://github.com/apache/kafka.git
MINOR: update truncation test (#18952)
Reduce the minISR to be 1 for the truncation test in order to skip the protection from KIP-966 Reviewers: David Jacot <djacot@confluent.io>, Colin P. McCabe <cmccabe@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
48a506b7b8
commit
10da082184
|
@ -1359,22 +1359,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
||||||
self.logger.info("Running alter message format command...\n%s" % cmd)
|
self.logger.info("Running alter message format command...\n%s" % cmd)
|
||||||
node.account.ssh(cmd)
|
node.account.ssh(cmd)
|
||||||
|
|
||||||
def set_unclean_leader_election(self, topic, value=True, node=None):
|
|
||||||
if node is None:
|
|
||||||
node = self.nodes[0]
|
|
||||||
if value is True:
|
|
||||||
self.logger.info("Enabling unclean leader election for topic %s", topic)
|
|
||||||
else:
|
|
||||||
self.logger.info("Disabling unclean leader election for topic %s", topic)
|
|
||||||
|
|
||||||
force_use_zk_connection = not self.all_nodes_configs_command_uses_bootstrap_server()
|
|
||||||
|
|
||||||
cmd = fix_opts_for_new_jvm(node)
|
|
||||||
cmd += "%s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s" % \
|
|
||||||
(self.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), topic, str(value).lower())
|
|
||||||
self.logger.info("Running alter unclean leader command...\n%s" % cmd)
|
|
||||||
node.account.ssh(cmd)
|
|
||||||
|
|
||||||
def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None):
|
def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None):
|
||||||
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
|
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
|
||||||
raise Exception("Must invoke kafka-acls against a broker, not a KRaft controller")
|
raise Exception("Must invoke kafka-acls against a broker, not a KRaft controller")
|
||||||
|
@ -1620,11 +1604,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
||||||
describe_output = self.describe_topic(topic, node, offline_nodes=offline_nodes)
|
describe_output = self.describe_topic(topic, node, offline_nodes=offline_nodes)
|
||||||
self.logger.debug(describe_output)
|
self.logger.debug(describe_output)
|
||||||
requested_partition_line = self._describe_topic_line_for_partition(partition, describe_output)
|
requested_partition_line = self._describe_topic_line_for_partition(partition, describe_output)
|
||||||
# e.g. Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
|
# e.g. Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 Elr: 4 LastKnownElr: 5
|
||||||
if not requested_partition_line:
|
if not requested_partition_line:
|
||||||
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||||
isr_csv = requested_partition_line.split()[9] # 10th column from above
|
isr_csv = requested_partition_line.split()[9] # 10th column from above
|
||||||
isr_idx_list = [int(i) for i in isr_csv.split(",")]
|
if isr_csv == "Elr:":
|
||||||
|
isr_idx_list = []
|
||||||
|
else:
|
||||||
|
isr_idx_list = [int(i) for i in isr_csv.split(",")]
|
||||||
|
|
||||||
self.logger.info("Isr for topic %s and partition %d is now: %s" % (topic, partition, isr_idx_list))
|
self.logger.info("Isr for topic %s and partition %d is now: %s" % (topic, partition, isr_idx_list))
|
||||||
return isr_idx_list
|
return isr_idx_list
|
||||||
|
|
|
@ -27,7 +27,9 @@ class TruncationTest(VerifiableConsumerTest):
|
||||||
TOPICS = {
|
TOPICS = {
|
||||||
TOPIC: {
|
TOPIC: {
|
||||||
'partitions': NUM_PARTITIONS,
|
'partitions': NUM_PARTITIONS,
|
||||||
'replication-factor': 2
|
'replication-factor': 2,
|
||||||
|
'configs': {"min.insync.replicas": 1,
|
||||||
|
"unclean.leader.election.enable": True}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
GROUP_ID = "truncation-test"
|
GROUP_ID = "truncation-test"
|
||||||
|
@ -80,6 +82,9 @@ class TruncationTest(VerifiableConsumerTest):
|
||||||
isr = self.kafka.isr_idx_list(self.TOPIC, 0)
|
isr = self.kafka.isr_idx_list(self.TOPIC, 0)
|
||||||
node1 = self.kafka.get_node(isr[0])
|
node1 = self.kafka.get_node(isr[0])
|
||||||
self.kafka.stop_node(node1)
|
self.kafka.stop_node(node1)
|
||||||
|
wait_until(lambda: len(self.kafka.isr_idx_list(self.TOPIC, 0)) == 1,
|
||||||
|
timeout_sec=30,
|
||||||
|
err_msg="The ISR update taking too long")
|
||||||
self.logger.info("Reduced ISR to one node, consumer is at %s", consumer.current_position(tp))
|
self.logger.info("Reduced ISR to one node, consumer is at %s", consumer.current_position(tp))
|
||||||
|
|
||||||
# Ensure remaining ISR member has a little bit of data
|
# Ensure remaining ISR member has a little bit of data
|
||||||
|
@ -112,7 +117,10 @@ class TruncationTest(VerifiableConsumerTest):
|
||||||
|
|
||||||
pre_truncation_pos = consumer.current_position(tp)
|
pre_truncation_pos = consumer.current_position(tp)
|
||||||
|
|
||||||
self.kafka.set_unclean_leader_election(self.TOPIC)
|
wait_until(lambda: len(self.kafka.isr_idx_list(self.TOPIC, 0)) == 1,
|
||||||
|
timeout_sec=30,
|
||||||
|
err_msg="The unclean leader election takes too long")
|
||||||
|
|
||||||
self.logger.info("New unclean leader, consumer is at %s", consumer.current_position(tp))
|
self.logger.info("New unclean leader, consumer is at %s", consumer.current_position(tp))
|
||||||
|
|
||||||
# Wait for truncation to be detected
|
# Wait for truncation to be detected
|
||||||
|
|
Loading…
Reference in New Issue