KAFKA-19358: Updated share_consumer_test.py tests to use set_group_offset_reset_strategy (#19878)
CI / build (push) Waiting to run Details

According to the current code in AK, the offset reset strategy for share
groups was set using the flag `--offset-reset-strategy` in the
share_consumer_test.py tests, but that would mean that the admin client
call would be sent out by all members in the share group. This PR
changes that by introducing `set_group_offset_reset_strategy` method in
kafka.py, which runs the kafka-configs.sh script in one of the existing
docker containers, thereby changing the config only once.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Chirag Wadhwa 2025-06-03 01:43:22 +05:30 committed by GitHub
parent 32903a1873
commit c5a78b0186
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 33 additions and 44 deletions

View File

@ -1795,23 +1795,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
cmd += " --state %s" % state cmd += " --state %s" % state
return self.run_cli_tool(node, cmd) return self.run_cli_tool(node, cmd)
def set_share_group_offset_reset_strategy(self, group, strategy=None, node=None):
""" Set the offset reset strategy config for the given group.
"""
if strategy is None:
return
if node is None:
node = self.nodes[0]
consumer_group_script = self.path.script("kafka-configs.sh", node)
cmd = fix_opts_for_new_jvm(node)
cmd += "%s --bootstrap-server %s --group %s --alter --add-config \"share.auto.offset.reset=%s\"" % \
(consumer_group_script,
self.bootstrap_servers(self.security_protocol),
group,
strategy)
return "Completed" in self.run_cli_tool(node, cmd)
def describe_consumer_group(self, group, node=None, command_config=None): def describe_consumer_group(self, group, node=None, command_config=None):
""" Describe a consumer group. """ Describe a consumer group.
""" """

View File

@ -107,10 +107,9 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac
"collect_default": True} "collect_default": True}
} }
def __init__(self, context, num_nodes, kafka, topic, group_id, def __init__(self, context, num_nodes, kafka, topic, group_id, max_messages=-1,
max_messages=-1, acknowledgement_mode="auto", offset_reset_strategy="", acknowledgement_mode="auto", version=DEV_BRANCH, stop_timeout_sec=60,
version=DEV_BRANCH, stop_timeout_sec=60, log_level="INFO", jaas_override_variables=None, log_level="INFO", jaas_override_variables=None, on_record_consumed=None):
on_record_consumed=None):
""" """
:param jaas_override_variables: A dict of variables to be used in the jaas.conf template file :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file
""" """
@ -119,7 +118,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac
self.kafka = kafka self.kafka = kafka
self.topic = topic self.topic = topic
self.group_id = group_id self.group_id = group_id
self.offset_reset_strategy = offset_reset_strategy
self.max_messages = max_messages self.max_messages = max_messages
self.acknowledgement_mode = acknowledgement_mode self.acknowledgement_mode = acknowledgement_mode
self.prop_file = "" self.prop_file = ""
@ -134,7 +132,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac
self.total_records_acknowledged_failed = 0 self.total_records_acknowledged_failed = 0
self.consumed_records_offsets = set() self.consumed_records_offsets = set()
self.acknowledged_records_offsets = set() self.acknowledged_records_offsets = set()
self.is_offset_reset_strategy_set = False
for node in self.nodes: for node in self.nodes:
node.version = version node.version = version
@ -186,8 +183,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac
self._update_global_consumed(event) self._update_global_consumed(event)
elif name == "record_data" and self.on_record_consumed: elif name == "record_data" and self.on_record_consumed:
self.on_record_consumed(event, node) self.on_record_consumed(event, node)
elif name == "offset_reset_strategy_set":
self._on_offset_reset_strategy_set()
else: else:
self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account), event)) self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account), event))
@ -213,9 +208,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac
if key not in self.consumed_records_offsets: if key not in self.consumed_records_offsets:
self.consumed_records_offsets.add(key) self.consumed_records_offsets.add(key)
def _on_offset_reset_strategy_set(self):
self.is_offset_reset_strategy_set = True
def start_cmd(self, node): def start_cmd(self, node):
cmd = "" cmd = ""
cmd += "export LOG_DIR=%s;" % VerifiableShareConsumer.LOG_DIR cmd += "export LOG_DIR=%s;" % VerifiableShareConsumer.LOG_DIR
@ -227,8 +219,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac
cmd += " --acknowledgement-mode %s" % self.acknowledgement_mode cmd += " --acknowledgement-mode %s" % self.acknowledgement_mode
cmd += " --offset-reset-strategy %s" % self.offset_reset_strategy
cmd += " --bootstrap-server %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol) cmd += " --bootstrap-server %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol)
cmd += " --group-id %s --topic %s" % (self.group_id, self.topic) cmd += " --group-id %s --topic %s" % (self.group_id, self.topic)
@ -315,10 +305,6 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac
with self.lock: with self.lock:
return self.event_handlers[node].total_acknowledged_failed return self.event_handlers[node].total_acknowledged_failed
def offset_reset_strategy_set(self):
with self.lock:
return self.is_offset_reset_strategy_set
def dead_nodes(self): def dead_nodes(self):
with self.lock: with self.lock:
return [handler.node for handler in self.event_handlers.values() return [handler.node for handler in self.event_handlers.values()

View File

@ -31,6 +31,8 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
num_producers = 1 num_producers = 1
num_brokers = 3 num_brokers = 3
share_group_id = "test_group_id"
default_timeout_sec = 600 default_timeout_sec = 600
def __init__(self, test_context): def __init__(self, test_context):
@ -41,7 +43,7 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
}) })
def setup_share_group(self, topic, **kwargs): def setup_share_group(self, topic, **kwargs):
consumer = super(ShareConsumerTest, self).setup_share_group(topic, **kwargs) consumer = super(ShareConsumerTest, self).setup_share_group(topic, group_id=self.share_group_id, **kwargs)
self.mark_for_collect(consumer, 'verifiable_share_consumer_stdout') self.mark_for_collect(consumer, 'verifiable_share_consumer_stdout')
return consumer return consumer
@ -124,7 +126,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
total_messages = 100000 total_messages = 100000
producer = self.setup_producer(self.TOPIC1["name"], max_messages=total_messages) producer = self.setup_producer(self.TOPIC1["name"], max_messages=total_messages)
consumer = self.setup_share_group(self.TOPIC1["name"], offset_reset_strategy="earliest") consumer = self.setup_share_group(self.TOPIC1["name"])
wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"),
timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest")
producer.start() producer.start()
@ -153,7 +158,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
total_messages = 1000000 total_messages = 1000000
producer = self.setup_producer(self.TOPIC2["name"], max_messages=total_messages, throughput=5000) producer = self.setup_producer(self.TOPIC2["name"], max_messages=total_messages, throughput=5000)
consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") consumer = self.setup_share_group(self.TOPIC2["name"])
wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"),
timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest")
producer.start() producer.start()
@ -181,7 +189,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
def test_broker_rolling_bounce(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, use_share_groups=True): def test_broker_rolling_bounce(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
producer = self.setup_producer(self.TOPIC2["name"]) producer = self.setup_producer(self.TOPIC2["name"])
consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") consumer = self.setup_share_group(self.TOPIC2["name"])
wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"),
timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest")
producer.start() producer.start()
self.await_produced_messages(producer, timeout_sec=self.default_timeout_sec) self.await_produced_messages(producer, timeout_sec=self.default_timeout_sec)
@ -213,7 +224,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
def test_broker_failure(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1, use_share_groups=True): def test_broker_failure(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1, use_share_groups=True):
producer = self.setup_producer(self.TOPIC2["name"]) producer = self.setup_producer(self.TOPIC2["name"])
consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") consumer = self.setup_share_group(self.TOPIC2["name"])
wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"),
timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest")
producer.start() producer.start()
self.await_produced_messages(producer, timeout_sec=self.default_timeout_sec) self.await_produced_messages(producer, timeout_sec=self.default_timeout_sec)
@ -256,7 +270,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
""" """
producer = self.setup_producer(self.TOPIC2["name"]) producer = self.setup_producer(self.TOPIC2["name"])
consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") consumer = self.setup_share_group(self.TOPIC2["name"])
wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"),
timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest")
producer.start() producer.start()
self.await_produced_messages(producer, timeout_sec=self.default_timeout_sec) self.await_produced_messages(producer, timeout_sec=self.default_timeout_sec)
@ -287,7 +304,10 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
def test_share_consumer_failure(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, num_failed_consumers=1, use_share_groups=True): def test_share_consumer_failure(self, clean_shutdown, metadata_quorum=quorum.isolated_kraft, num_failed_consumers=1, use_share_groups=True):
producer = self.setup_producer(self.TOPIC2["name"]) producer = self.setup_producer(self.TOPIC2["name"])
consumer = self.setup_share_group(self.TOPIC2["name"], offset_reset_strategy="earliest") consumer = self.setup_share_group(self.TOPIC2["name"])
wait_until(lambda: self.kafka.set_share_group_offset_reset_strategy(group=self.share_group_id, strategy="earliest"),
timeout_sec=20, backoff_sec=2, err_msg="share.auto.offset.reset not set to earliest")
# startup the producer and ensure that some records have been written # startup the producer and ensure that some records have been written
producer.start() producer.start()

View File

@ -49,10 +49,10 @@ class VerifiableShareConsumerTest(KafkaTest):
"""Override this since we're adding services outside of the constructor""" """Override this since we're adding services outside of the constructor"""
return super(VerifiableShareConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers return super(VerifiableShareConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers
def setup_share_group(self, topic, acknowledgement_mode="auto", group_id="test_group_id", offset_reset_strategy="", **kwargs): def setup_share_group(self, topic, acknowledgement_mode="auto", group_id="test_group_id", **kwargs):
return VerifiableShareConsumer(self.test_context, self.num_consumers, self.kafka, return VerifiableShareConsumer(self.test_context, self.num_consumers, self.kafka,
topic, group_id, acknowledgement_mode=acknowledgement_mode, topic, group_id, acknowledgement_mode=acknowledgement_mode,
offset_reset_strategy=offset_reset_strategy, log_level="TRACE", **kwargs) log_level="TRACE", **kwargs)
def setup_producer(self, topic, max_messages=-1, throughput=500): def setup_producer(self, topic, max_messages=-1, throughput=500):
return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic, return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic,