KAFKA-18763: changed the assertion statement for acknowledgements to include only successful acks (#18846)

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Chirag Wadhwa 2025-02-10 17:06:26 +05:30 committed by GitHub
parent be89ce5f61
commit 7fef5b8646
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 10 additions and 10 deletions

View File

@ -34,7 +34,7 @@ class ShareConsumerEventHandler(object):
self.node = node self.node = node
self.idx = idx self.idx = idx
self.total_consumed = 0 self.total_consumed = 0
self.total_acknowledged = 0 self.total_acknowledged_successfully = 0
self.total_acknowledged_failed = 0 self.total_acknowledged_failed = 0
self.consumed_per_partition = {} self.consumed_per_partition = {}
self.acknowledged_per_partition = {} self.acknowledged_per_partition = {}
@ -52,7 +52,7 @@ class ShareConsumerEventHandler(object):
def handle_offsets_acknowledged(self, event, node, logger): def handle_offsets_acknowledged(self, event, node, logger):
if event["success"]: if event["success"]:
self.total_acknowledged += event["count"] self.total_acknowledged_successfully += event["count"]
for share_partition_data in event["partitions"]: for share_partition_data in event["partitions"]:
topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"])
self.acknowledged_per_partition[topic_partition] = self.acknowledged_per_partition.get(topic_partition, 0) + share_partition_data["count"] self.acknowledged_per_partition[topic_partition] = self.acknowledged_per_partition.get(topic_partition, 0) + share_partition_data["count"]
@ -291,7 +291,7 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac
with self.lock: with self.lock:
return self.total_records_acknowledged + self.total_records_acknowledged_failed return self.total_records_acknowledged + self.total_records_acknowledged_failed
def total_successful_acknowledged(self): def total_acknowledged_successfully(self):
with self.lock: with self.lock:
return self.total_records_acknowledged return self.total_records_acknowledged
@ -305,11 +305,11 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Bac
def total_acknowledged_for_a_share_consumer(self, node): def total_acknowledged_for_a_share_consumer(self, node):
with self.lock: with self.lock:
return self.event_handlers[node].total_acknowledged + self.event_handlers[node].total_acknowledged_failed return self.event_handlers[node].total_acknowledged_successfully + self.event_handlers[node].total_acknowledged_failed
def total_successful_acknowledged_for_a_share_consumer(self, node): def total_acknowledged_sucessfully_for_a_share_consumer(self, node):
with self.lock: with self.lock:
return self.event_handlers[node].total_acknowledged return self.event_handlers[node].total_acknowledged_successfully
def total_failed_acknowledged_for_a_share_consumer(self, node): def total_failed_acknowledged_for_a_share_consumer(self, node):
with self.lock: with self.lock:

View File

@ -133,11 +133,11 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
self.await_acknowledged_messages(consumer, min_messages=total_messages, timeout_sec=self.default_timeout_sec) self.await_acknowledged_messages(consumer, min_messages=total_messages, timeout_sec=self.default_timeout_sec)
assert consumer.total_consumed() >= producer.num_acked assert consumer.total_consumed() >= producer.num_acked
assert consumer.total_acknowledged() == producer.num_acked assert consumer.total_acknowledged_successfully() == producer.num_acked
for event_handler in consumer.event_handlers.values(): for event_handler in consumer.event_handlers.values():
assert event_handler.total_consumed > 0 assert event_handler.total_consumed > 0
assert event_handler.total_acknowledged > 0 assert event_handler.total_acknowledged_successfully > 0
producer.stop() producer.stop()
consumer.stop_all() consumer.stop_all()
@ -161,11 +161,11 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
self.await_acknowledged_messages(consumer, min_messages=total_messages, timeout_sec=self.default_timeout_sec) self.await_acknowledged_messages(consumer, min_messages=total_messages, timeout_sec=self.default_timeout_sec)
assert consumer.total_consumed() >= producer.num_acked assert consumer.total_consumed() >= producer.num_acked
assert consumer.total_acknowledged() == producer.num_acked assert consumer.total_acknowledged_successfully() == producer.num_acked
for event_handler in consumer.event_handlers.values(): for event_handler in consumer.event_handlers.values():
assert event_handler.total_consumed > 0 assert event_handler.total_consumed > 0
assert event_handler.total_acknowledged > 0 assert event_handler.total_acknowledged_successfully > 0
for topic_partition in self.get_topic_partitions(self.TOPIC2): for topic_partition in self.get_topic_partitions(self.TOPIC2):
assert topic_partition in event_handler.consumed_per_partition assert topic_partition in event_handler.consumed_per_partition
assert event_handler.consumed_per_partition[topic_partition] > 0 assert event_handler.consumed_per_partition[topic_partition] > 0