KAFKA-19357 AsyncConsumer#close hangs as commitAsync never completes when coordinator is missing (#19914)

Problem:  When AsyncConsumer is closing, CoordinatorRequestManager stops
looking for coordinator by returning EMPTY in poll() method when closing
flag is true. This prevents commitAsync() and other
coordinator-dependent operations from completing, causing close() to
hang until timeout.

Solution:
Modified the closing flag check in poll() method of
CommitRequestManager to be more targeted:
- When both coordinators are unknown and the consumer is closing, only
return EMPTY
- When this condition is met, proactively fail all pending commit
requests with CommitFailedException
- This allows coordinator lookup to continue when coordinator is
available during shutdown, while preventing indefinite hangs when
coordinator is unreachable

Reviewers: PoAn Yang <payang@apache.org>, Andrew Schofield
 <aschofield@confluent.io>, TengYao Chi <kitingiao@gmail.com>, Kirk True
 <kirk@kirktrue.pro>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Lan Ding
 <isDing_L@163.com>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
 <s7133700@gmail.com>, KuoChe <kuoche1712003@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
YuChia Ma 2025-09-29 23:06:56 +08:00 committed by GitHub
parent 3c0843961b
commit 92169b8f08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 86 additions and 0 deletions

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
@ -452,6 +453,40 @@ public class PlaintextConsumerCommitTest {
}
}
/**
* This is testing when closing the consumer but commit request has already been sent.
* During the closing, the consumer won't find the coordinator anymore.
*/
@ClusterTest
public void testCommitAsyncFailsWhenCoordinatorUnavailableDuringClose() throws InterruptedException {
try (Producer<byte[], byte[]> producer = cluster.producer();
var consumer = createConsumer(GroupProtocol.CONSUMER, false)
) {
sendRecords(producer, tp, 3, System.currentTimeMillis());
consumer.assign(List.of(tp));
var callback = new CountConsumerCommitCallback();
// Close the coordinator before committing because otherwise the commit will fail to find the coordinator.
cluster.brokerIds().forEach(cluster::shutdownBroker);
TestUtils.waitForCondition(() -> cluster.aliveBrokers().isEmpty(), "All brokers should be shut down");
consumer.poll(Duration.ofMillis(500));
consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), callback);
long startTime = System.currentTimeMillis();
consumer.close(CloseOptions.timeout(Duration.ofMillis(500)));
long closeDuration = System.currentTimeMillis() - startTime;
assertTrue(closeDuration < 1000, "The closing process for the consumer was too long: " + closeDuration + " ms");
assertTrue(callback.lastError.isPresent());
assertEquals(CommitFailedException.class, callback.lastError.get().getClass());
assertEquals("Failed to commit offsets: Coordinator unknown and consumer is closing", callback.lastError.get().getMessage());
assertEquals(1, callback.exceptionCount);
}
}
// TODO: This only works in the new consumer, but should be fixed for the old consumer as well
@ClusterTest
public void testCommitAsyncCompletedBeforeConsumerCloses() throws InterruptedException {
@ -575,6 +610,7 @@ public class PlaintextConsumerCommitTest {
private static class CountConsumerCommitCallback implements OffsetCommitCallback {
private int successCount = 0;
private int exceptionCount = 0;
private Optional<Exception> lastError = Optional.empty();
@Override
@ -582,6 +618,7 @@ public class PlaintextConsumerCommitTest {
if (exception == null) {
successCount += 1;
} else {
exceptionCount += 1;
lastError = Optional.of(exception);
}
}

View File

@ -181,6 +181,14 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
// poll when the coordinator node is known and fatal error is not present
if (coordinatorRequestManager.coordinator().isEmpty()) {
pendingRequests.maybeFailOnCoordinatorFatalError();
if (closing && pendingRequests.hasUnsentRequests()) {
CommitFailedException exception = new CommitFailedException(
"Failed to commit offsets: Coordinator unknown and consumer is closing");
pendingRequests.drainPendingCommits()
.forEach(request -> request.future().completeExceptionally(exception));
}
return EMPTY;
}

View File

@ -1494,6 +1494,47 @@ public class CommitRequestManagerTest {
assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
}
@Test
public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() {
CommitRequestManager commitRequestManager = create(true, 100);
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
var commitFuture = commitRequestManager.commitAsync(offsets);
commitRequestManager.signalClose();
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
when(coordinatorRequestManager.fatalError())
.thenReturn(Optional.of(new GroupAuthorizationException("Fatal error")));
assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(time.milliseconds()));
assertTrue(commitFuture.isCompletedExceptionally());
TestUtils.assertFutureThrows(GroupAuthorizationException.class, commitFuture, "Fatal error");
}
@Test
public void testPollWithClosingAndPendingRequests() {
CommitRequestManager commitRequestManager = create(true, 100);
Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
var commitFuture = commitRequestManager.commitAsync(offsets);
commitRequestManager.signalClose();
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(time.milliseconds()));
assertTrue(commitFuture.isCompletedExceptionally());
TestUtils.assertFutureThrows(CommitFailedException.class, commitFuture,
"Failed to commit offsets: Coordinator unknown and consumer is closing");
}
// Supplies (error, isRetriable)
private static Stream<Arguments> partitionDataErrorSupplier() {
return Stream.of(