From e1da3187221ac8747b672ea0cefa7cfc364fdb02 Mon Sep 17 00:00:00 2001 From: Lan Ding <53332773+DL1231@users.noreply.github.com> Date: Wed, 7 May 2025 05:05:02 +0800 Subject: [PATCH] MINOR: add boundary IT for delivery count (#19649) see https://github.com/apache/kafka/pull/19430#pullrequestreview-2809619176 Add boundary IT for delivery count. Reviewers: Apoorv Mittal --- .../clients/consumer/ShareConsumerTest.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 70e0e15c78b..7bb348143d1 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -2043,6 +2043,48 @@ public class ShareConsumerTest { } } + @ClusterTest( + serverProperties = { + @ClusterConfigProperty(key = "group.share.delivery.count.limit", value = "2"), + } + ) + public void testBehaviorOnDeliveryCountBoundary() { + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer( + "group1", + Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) { + + ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), null, + "key".getBytes(), "value".getBytes()); + producer.send(record); + producer.flush(); + + shareConsumer.subscribe(Set.of(tp.topic())); + ConsumerRecords records = waitedPoll(shareConsumer, 2500L, 1); + assertEquals(1, records.count()); + assertEquals((short) 1, records.records(tp).get(0).deliveryCount().get()); + // Acknowledge the record with AcknowledgeType.RELEASE. + shareConsumer.acknowledge(records.records(tp).get(0), AcknowledgeType.RELEASE); + Map> result = shareConsumer.commitSync(); + assertEquals(1, result.size()); + + // Consume again, the delivery count should be 2. + records = waitedPoll(shareConsumer, 2500L, 1); + assertEquals(1, records.count()); + assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get()); + + } + + // Start again and same record should be delivered + try (ShareConsumer shareConsumer = createShareConsumer("group1", Map.of())) { + shareConsumer.subscribe(Set.of(tp.topic())); + ConsumerRecords records = waitedPoll(shareConsumer, 2500L, 1); + assertEquals(1, records.count()); + assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get()); + } + } + @ClusterTest( brokers = 3, serverProperties = {