MINOR: add boundary IT for delivery count (#19649)
CI / build (push) Waiting to run Details

see
https://github.com/apache/kafka/pull/19430#pullrequestreview-2809619176
Add boundary IT for delivery count.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Lan Ding 2025-05-07 05:05:02 +08:00 committed by GitHub
parent 7953092108
commit e1da318722
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 42 additions and 0 deletions

View File

@ -2043,6 +2043,48 @@ public class ShareConsumerTest {
} }
} }
@ClusterTest(
serverProperties = {
@ClusterConfigProperty(key = "group.share.delivery.count.limit", value = "2"),
}
)
public void testBehaviorOnDeliveryCountBoundary() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null,
"key".getBytes(), "value".getBytes());
producer.send(record);
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
assertEquals((short) 1, records.records(tp).get(0).deliveryCount().get());
// Acknowledge the record with AcknowledgeType.RELEASE.
shareConsumer.acknowledge(records.records(tp).get(0), AcknowledgeType.RELEASE);
Map<TopicIdPartition, Optional<KafkaException>> result = shareConsumer.commitSync();
assertEquals(1, result.size());
// Consume again, the delivery count should be 2.
records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get());
}
// Start again and same record should be delivered
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of())) {
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get());
}
}
@ClusterTest( @ClusterTest(
brokers = 3, brokers = 3,
serverProperties = { serverProperties = {