From 92169b8f08c21826612a0aba01e5eda3464923c7 Mon Sep 17 00:00:00 2001 From: YuChia Ma <79797958+Mirai1129@users.noreply.github.com> Date: Mon, 29 Sep 2025 23:06:56 +0800 Subject: [PATCH] KAFKA-19357 AsyncConsumer#close hangs as commitAsync never completes when coordinator is missing (#19914) Problem: When AsyncConsumer is closing, CoordinatorRequestManager stops looking for coordinator by returning EMPTY in poll() method when closing flag is true. This prevents commitAsync() and other coordinator-dependent operations from completing, causing close() to hang until timeout. Solution: Modified the closing flag check in poll() method of CommitRequestManager to be more targeted: - When both coordinators are unknown and the consumer is closing, only return EMPTY - When this condition is met, proactively fail all pending commit requests with CommitFailedException - This allows coordinator lookup to continue when coordinator is available during shutdown, while preventing indefinite hangs when coordinator is unreachable Reviewers: PoAn Yang , Andrew Schofield , TengYao Chi , Kirk True , Jhen-Yung Hsu , Lan Ding , TaiJuWu , Ken Huang , KuoChe , Chia-Ping Tsai --- .../consumer/PlaintextConsumerCommitTest.java | 37 +++++++++++++++++ .../internals/CommitRequestManager.java | 8 ++++ .../internals/CommitRequestManagerTest.java | 41 +++++++++++++++++++ 3 files changed, 86 insertions(+) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java index b5bd27cf41b..c00d1ddab90 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.common.test.api.Type; import org.apache.kafka.test.MockConsumerInterceptor; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.BeforeEach; @@ -452,6 +453,40 @@ public class PlaintextConsumerCommitTest { } } + /** + * This is testing when closing the consumer but commit request has already been sent. + * During the closing, the consumer won't find the coordinator anymore. + */ + @ClusterTest + public void testCommitAsyncFailsWhenCoordinatorUnavailableDuringClose() throws InterruptedException { + try (Producer producer = cluster.producer(); + var consumer = createConsumer(GroupProtocol.CONSUMER, false) + ) { + sendRecords(producer, tp, 3, System.currentTimeMillis()); + consumer.assign(List.of(tp)); + + var callback = new CountConsumerCommitCallback(); + + // Close the coordinator before committing because otherwise the commit will fail to find the coordinator. + cluster.brokerIds().forEach(cluster::shutdownBroker); + + TestUtils.waitForCondition(() -> cluster.aliveBrokers().isEmpty(), "All brokers should be shut down"); + + consumer.poll(Duration.ofMillis(500)); + consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), callback); + + long startTime = System.currentTimeMillis(); + consumer.close(CloseOptions.timeout(Duration.ofMillis(500))); + long closeDuration = System.currentTimeMillis() - startTime; + + assertTrue(closeDuration < 1000, "The closing process for the consumer was too long: " + closeDuration + " ms"); + assertTrue(callback.lastError.isPresent()); + assertEquals(CommitFailedException.class, callback.lastError.get().getClass()); + assertEquals("Failed to commit offsets: Coordinator unknown and consumer is closing", callback.lastError.get().getMessage()); + assertEquals(1, callback.exceptionCount); + } + } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well @ClusterTest public void testCommitAsyncCompletedBeforeConsumerCloses() throws InterruptedException { @@ -575,6 +610,7 @@ public class PlaintextConsumerCommitTest { private static class CountConsumerCommitCallback implements OffsetCommitCallback { private int successCount = 0; + private int exceptionCount = 0; private Optional lastError = Optional.empty(); @Override @@ -582,6 +618,7 @@ public class PlaintextConsumerCommitTest { if (exception == null) { successCount += 1; } else { + exceptionCount += 1; lastError = Optional.of(exception); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index fe4d3806f2a..6aae084fd47 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -181,6 +181,14 @@ public class CommitRequestManager implements RequestManager, MemberStateListener // poll when the coordinator node is known and fatal error is not present if (coordinatorRequestManager.coordinator().isEmpty()) { pendingRequests.maybeFailOnCoordinatorFatalError(); + + if (closing && pendingRequests.hasUnsentRequests()) { + CommitFailedException exception = new CommitFailedException( + "Failed to commit offsets: Coordinator unknown and consumer is closing"); + pendingRequests.drainPendingCommits() + .forEach(request -> request.future().completeExceptionally(exception)); + } + return EMPTY; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 26d39715d27..afbb81eb53f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -1494,6 +1494,47 @@ public class CommitRequestManagerTest { assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty()); } + @Test + public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() { + CommitRequestManager commitRequestManager = create(true, 100); + + Map offsets = Map.of(new TopicPartition("topic", 1), + new OffsetAndMetadata(0)); + + var commitFuture = commitRequestManager.commitAsync(offsets); + + commitRequestManager.signalClose(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + when(coordinatorRequestManager.fatalError()) + .thenReturn(Optional.of(new GroupAuthorizationException("Fatal error"))); + + assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(time.milliseconds())); + + assertTrue(commitFuture.isCompletedExceptionally()); + + TestUtils.assertFutureThrows(GroupAuthorizationException.class, commitFuture, "Fatal error"); + } + + @Test + public void testPollWithClosingAndPendingRequests() { + CommitRequestManager commitRequestManager = create(true, 100); + + Map offsets = Map.of(new TopicPartition("topic", 1), + new OffsetAndMetadata(0)); + + var commitFuture = commitRequestManager.commitAsync(offsets); + + commitRequestManager.signalClose(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + + assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(time.milliseconds())); + + assertTrue(commitFuture.isCompletedExceptionally()); + + TestUtils.assertFutureThrows(CommitFailedException.class, commitFuture, + "Failed to commit offsets: Coordinator unknown and consumer is closing"); + } + // Supplies (error, isRetriable) private static Stream partitionDataErrorSupplier() { return Stream.of(