From 6826f45fd8588dcf16474fa0a9d6c38d195dc37a Mon Sep 17 00:00:00 2001 From: Ming-Yen Chung Date: Mon, 2 Jun 2025 01:01:24 +0800 Subject: [PATCH] KAFKA-19352 Create offsets topic to fix flaky testCommitAsyncCompletedBeforeConsumerCloses (#19873) The flakiness occurs when the offsets topic does not yet exist. Hence, the issue is mitigated by creating the offsets topic in `setup()`. This serves as a workaround. The root cause is tracked in [KAFKA-19357](https://issues.apache.org/jira/browse/KAFKA-19357). I ran the test 100 times on my Mac and all of them passed. Reviewers: Ken Huang , Chia-Ping Tsai --- .../kafka/clients/consumer/PlaintextConsumerCommitTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java index c3f0aedccc6..162b7baa371 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java @@ -453,10 +453,14 @@ public class PlaintextConsumerCommitTest { // TODO: This only works in the new consumer, but should be fixed for the old consumer as well @ClusterTest - public void testCommitAsyncCompletedBeforeConsumerCloses() { + public void testCommitAsyncCompletedBeforeConsumerCloses() throws InterruptedException { // This is testing the contract that asynchronous offset commit are completed before the consumer // is closed, even when no commit sync is performed as part of the close (due to auto-commit // disabled, or simply because there are no consumed offsets). + + // Create offsets topic to ensure coordinator is available during close + cluster.createTopic(Topic.GROUP_METADATA_TOPIC_NAME, Integer.parseInt(OFFSETS_TOPIC_PARTITIONS), Short.parseShort(OFFSETS_TOPIC_REPLICATION)); + try (Producer producer = cluster.producer(Map.of(ProducerConfig.ACKS_CONFIG, "all")); var consumer = createConsumer(GroupProtocol.CONSUMER, false) ) {