KAFKA-19217: Fix ShareConsumerTest.testComplexConsumer flakiness. (#19734)

* Added alter offset to earliest.
* Reduced produce time to reduce overall test time.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-05-16 13:30:16 +05:30 committed by GitHub
parent 8fc41f9ca2
commit e70d05ecda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 8 additions and 6 deletions

View File

@ -2108,7 +2108,7 @@ public class ShareConsumerTest {
ClientState prodState = new ClientState();
// produce messages until we want
// Produce messages until we want.
service.execute(() -> {
try (Producer<byte[], byte[]> producer = createProducer()) {
while (!prodState.done().get()) {
@ -2120,13 +2120,14 @@ public class ShareConsumerTest {
}
});
// init a complex share consumer
// Init a complex share consumer.
ComplexShareConsumer<byte[], byte[]> complexCons1 = new ComplexShareConsumer<>(
cluster.bootstrapServers(),
topicName,
groupId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)
);
alterShareAutoOffsetReset(groupId, "earliest");
service.schedule(
complexCons1,
@ -2134,13 +2135,14 @@ public class ShareConsumerTest {
TimeUnit.MILLISECONDS
);
// let the complex consumer read the messages
service.schedule(() -> prodState.done().set(true), 10L, TimeUnit.SECONDS);
// Let the complex consumer read the messages.
service.schedule(() -> prodState.done().set(true), 5L, TimeUnit.SECONDS);
// all messages which can be read are read, some would be redelivered
// All messages which can be read are read, some would be redelivered (roughly 3 times the records produced).
TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did not close!");
int delta = complexCons1.recordsRead() - (int) (prodState.count().get() * 3 * 0.95); // 3 times with margin of error (5%).
assertTrue(prodState.count().get() < complexCons1.recordsRead(),
assertTrue(delta > 0,
String.format("Producer (%d) and share consumer (%d) record count mismatch.", prodState.count().get(), complexCons1.recordsRead()));
shutdownExecutorService(service);