diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 25249a7ab22..ff106425005 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1795,23 +1795,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): cmd += " --state %s" % state return self.run_cli_tool(node, cmd) - def set_share_group_offset_reset_strategy(self, group, strategy=None, node=None): - """ Set the offset reset strategy config for the given group. - """ - if strategy is None: - return - if node is None: - node = self.nodes[0] - consumer_group_script = self.path.script("kafka-configs.sh", node) - - cmd = fix_opts_for_new_jvm(node) - cmd += "%s --bootstrap-server %s --group %s --alter --add-config \"share.auto.offset.reset=%s\"" % \ - (consumer_group_script, - self.bootstrap_servers(self.security_protocol), - group, - strategy) - return "Completed" in self.run_cli_tool(node, cmd) - def describe_consumer_group(self, group, node=None, command_config=None): """ Describe a consumer group. """ diff --git a/tests/kafkatest/services/verifiable_share_consumer.py b/tests/kafkatest/services/verifiable_share_consumer.py index 1ccbca121cf..016a8be8092 100644 --- a/tests/kafkatest/services/verifiable_share_consumer.py +++ b/tests/kafkatest/services/verifiable_share_consumer.py @@ -107,10 +107,9 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, group_id, - max_messages=-1, acknowledgement_mode="auto", offset_reset_strategy="", - version=DEV_BRANCH, stop_timeout_sec=60, log_level="INFO", jaas_override_variables=None, - on_record_consumed=None): + def __init__(self, context, num_nodes, kafka, topic, group_id, max_messages=-1, + acknowledgement_mode="auto", version=DEV_BRANCH, stop_timeout_sec=60, + log_level="INFO", jaas_override_variables=None, on_record_consumed=None): """ :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file """ @@ -119,7 +118,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac self.kafka = kafka self.topic = topic self.group_id = group_id - self.offset_reset_strategy = offset_reset_strategy self.max_messages = max_messages self.acknowledgement_mode = acknowledgement_mode self.prop_file = "" @@ -134,7 +132,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac self.total_records_acknowledged_failed = 0 self.consumed_records_offsets = set() self.acknowledged_records_offsets = set() - self.is_offset_reset_strategy_set = False for node in self.nodes: node.version = version @@ -186,8 +183,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac self._update_global_consumed(event) elif name == "record_data" and self.on_record_consumed: self.on_record_consumed(event, node) - elif name == "offset_reset_strategy_set": - self._on_offset_reset_strategy_set() else: self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account), event)) @@ -213,9 +208,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac if key not in self.consumed_records_offsets: self.consumed_records_offsets.add(key) - def _on_offset_reset_strategy_set(self): - self.is_offset_reset_strategy_set = True - def start_cmd(self, node): cmd = "" cmd += "export LOG_DIR=%s;" % VerifiableShareConsumer.LOG_DIR @@ -227,8 +219,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac cmd += " --acknowledgement-mode %s" % self.acknowledgement_mode - cmd += " --offset-reset-strategy %s" % self.offset_reset_strategy - cmd += " --bootstrap-server %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol) cmd += " --group-id %s --topic %s" % (self.group_id, self.topic) @@ -315,10 +305,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac with self.lock: return self.event_handlers[node].total_acknowledged_failed - def offset_reset_strategy_set(self): - with self.lock: - return self.is_offset_reset_strategy_set - def dead_nodes(self): with self.lock: return [handler.node for handler in self.event_handlers.values() diff --git a/tests/kafkatest/tests/client/share_consumer_test.py b/tests/kafkatest/tests/client/share_consumer_test.py index 916dcb2dfab..3da722b485c 100644 --- a/tests/kafkatest/tests/client/share_consumer_test.py +++ b/tests/kafkatest/tests/client/share_consumer_test.py @@ -31,6 +31,8 @@ class ShareConsumerTest(VerifiableShareConsumerTest): num_producers = 1 num_brokers = 3 + share_group_id = "test_group_id" + default_timeout_sec = 600 def __init__(self, test_context): @@ -41,7 +43,7 @@ class ShareConsumerTest(VerifiableShareConsumerTest): }) def setup_share_group(self, topic, **kwargs): - consumer = super(ShareConsumerTest, self).setup_share_group(topic, **kwargs) + consumer = super(ShareConsumerTest, self).setup_share_group(topic, group_id=self.share_group_id, **kwargs) self.mark_for_collect(consumer, 'verifiable_share_consumer_stdout') return consumer @@ -124,7 +126,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest): total_messages = 100000 producer = self.setup_producer(self.TOPIC1["name"], max_messages=total_messages) - consumer = self.setup_share_group(self.TOPIC1["name"], offset_reset_strategy="earliest") + consumer = self.setup_share_group(self.TOPIC1["name"]) + + wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"), + timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest") producer.start() @@ -153,7 +158,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest): total_messages = 1000000 producer = self.setup_producer(self.TOPIC2["name"], max_messages=total_messages, throughput=5000) - consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") + consumer = self.setup_share_group(self.TOPIC2["name"]) + + wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"), + timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest") producer.start() @@ -181,7 +189,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest): def test_broker_rolling_bounce(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, use_share_groups=True): producer = self.setup_producer(self.TOPIC2["name"]) - consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") + consumer = self.setup_share_group(self.TOPIC2["name"]) + + wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"), + timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest") producer.start() self.await_produced_messages(producer, timeout_sec=self.default_timeout_sec) @@ -213,7 +224,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest): def test_broker_failure(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1, use_share_groups=True): producer = self.setup_producer(self.TOPIC2["name"]) - consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") + consumer = self.setup_share_group(self.TOPIC2["name"]) + + wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"), + timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest") producer.start() self.await_produced_messages(producer, timeout_sec=self.default_timeout_sec) @@ -256,7 +270,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest): """ producer = self.setup_producer(self.TOPIC2["name"]) - consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") + consumer = self.setup_share_group(self.TOPIC2["name"]) + + wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"), + timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest") producer.start() self.await_produced_messages(producer, timeout_sec=self.default_timeout_sec) @@ -287,7 +304,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest): def test_share_consumer_failure(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, num_failed_consumers=1, use_share_groups=True): producer = self.setup_producer(self.TOPIC2["name"]) - consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") + consumer = self.setup_share_group(self.TOPIC2["name"]) + + wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"), + timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest") # startup the producer and ensure that some records have been written producer.start() diff --git a/tests/kafkatest/tests/verifiable_share_consumer_test.py b/tests/kafkatest/tests/verifiable_share_consumer_test.py index 263a40e62fb..37e1c56d11b 100644 --- a/tests/kafkatest/tests/verifiable_share_consumer_test.py +++ b/tests/kafkatest/tests/verifiable_share_consumer_test.py @@ -49,10 +49,10 @@ class VerifiableShareConsumerTest(KafkaTest): """Override this since we're adding services outside of the constructor""" return super(VerifiableShareConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers - def setup_share_group(self, topic, acknowledgement_mode="auto", group_id="test_group_id", offset_reset_strategy="", **kwargs): + def setup_share_group(self, topic, acknowledgement_mode="auto", group_id="test_group_id", **kwargs): return VerifiableShareConsumer(self.test_context, self.num_consumers, self.kafka, - topic, group_id, acknowledgement_mode=acknowledgement_mode, - offset_reset_strategy=offset_reset_strategy, log_level="TRACE", **kwargs) + topic, group_id, acknowledgement_mode=acknowledgement_mode, + log_level="TRACE", **kwargs) def setup_producer(self, topic, max_messages=-1, throughput=500): return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic,