From e70d05ecda8095cb0909b4c20b4d4c66b47c7bcf Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Fri, 16 May 2025 13:30:16 +0530 Subject: [PATCH] KAFKA-19217: Fix ShareConsumerTest.testComplexConsumer flakiness. (#19734) * Added alter offset to earliest. * Reduced produce time to reduce overall test time. Reviewers: Andrew Schofield --- .../kafka/clients/consumer/ShareConsumerTest.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 45d0ffa1a98..a27b8d9b53f 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -2108,7 +2108,7 @@ public class ShareConsumerTest { ClientState prodState = new ClientState(); - // produce messages until we want + // Produce messages until we want. service.execute(() -> { try (Producer producer = createProducer()) { while (!prodState.done().get()) { @@ -2120,13 +2120,14 @@ public class ShareConsumerTest { } }); - // init a complex share consumer + // Init a complex share consumer. ComplexShareConsumer complexCons1 = new ComplexShareConsumer<>( cluster.bootstrapServers(), topicName, groupId, Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT) ); + alterShareAutoOffsetReset(groupId, "earliest"); service.schedule( complexCons1, @@ -2134,13 +2135,14 @@ public class ShareConsumerTest { TimeUnit.MILLISECONDS ); - // let the complex consumer read the messages - service.schedule(() -> prodState.done().set(true), 10L, TimeUnit.SECONDS); + // Let the complex consumer read the messages. + service.schedule(() -> prodState.done().set(true), 5L, TimeUnit.SECONDS); - // all messages which can be read are read, some would be redelivered + // All messages which can be read are read, some would be redelivered (roughly 3 times the records produced). TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did not close!"); + int delta = complexCons1.recordsRead() - (int) (prodState.count().get() * 3 * 0.95); // 3 times with margin of error (5%). - assertTrue(prodState.count().get() < complexCons1.recordsRead(), + assertTrue(delta > 0, String.format("Producer (%d) and share consumer (%d) record count mismatch.", prodState.count().get(), complexCons1.recordsRead())); shutdownExecutorService(service);