diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java index b5bd27cf41b..c00d1ddab90 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.common.test.api.Type; import org.apache.kafka.test.MockConsumerInterceptor; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.BeforeEach; @@ -452,6 +453,40 @@ public class PlaintextConsumerCommitTest { } } + /** + * This is testing when closing the consumer but commit request has already been sent. + * During the closing, the consumer won't find the coordinator anymore. + */ + @ClusterTest + public void testCommitAsyncFailsWhenCoordinatorUnavailableDuringClose() throws InterruptedException { + try (Producer producer = cluster.producer(); + var consumer = createConsumer(GroupProtocol.CONSUMER, false) + ) { + sendRecords(producer, tp, 3, System.currentTimeMillis()); + consumer.assign(List.of(tp)); + + var callback = new CountConsumerCommitCallback(); + + // Close the coordinator before committing because otherwise the commit will fail to find the coordinator. + cluster.brokerIds().forEach(cluster::shutdownBroker); + + TestUtils.waitForCondition(() -> cluster.aliveBrokers().isEmpty(), "All brokers should be shut down"); + + consumer.poll(Duration.ofMillis(500)); + consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), callback); + + long startTime = System.currentTimeMillis(); + consumer.close(CloseOptions.timeout(Duration.ofMillis(500))); + long closeDuration = System.currentTimeMillis() - startTime; + + assertTrue(closeDuration < 1000, "The closing process for the consumer was too long: " + closeDuration + " ms"); + assertTrue(callback.lastError.isPresent()); + assertEquals(CommitFailedException.class, callback.lastError.get().getClass()); + assertEquals("Failed to commit offsets: Coordinator unknown and consumer is closing", callback.lastError.get().getMessage()); + assertEquals(1, callback.exceptionCount); + } + } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well @ClusterTest public void testCommitAsyncCompletedBeforeConsumerCloses() throws InterruptedException { @@ -575,6 +610,7 @@ public class PlaintextConsumerCommitTest { private static class CountConsumerCommitCallback implements OffsetCommitCallback { private int successCount = 0; + private int exceptionCount = 0; private Optional lastError = Optional.empty(); @Override @@ -582,6 +618,7 @@ public class PlaintextConsumerCommitTest { if (exception == null) { successCount += 1; } else { + exceptionCount += 1; lastError = Optional.of(exception); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index fe4d3806f2a..6aae084fd47 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -181,6 +181,14 @@ public class CommitRequestManager implements RequestManager, MemberStateListener // poll when the coordinator node is known and fatal error is not present if (coordinatorRequestManager.coordinator().isEmpty()) { pendingRequests.maybeFailOnCoordinatorFatalError(); + + if (closing && pendingRequests.hasUnsentRequests()) { + CommitFailedException exception = new CommitFailedException( + "Failed to commit offsets: Coordinator unknown and consumer is closing"); + pendingRequests.drainPendingCommits() + .forEach(request -> request.future().completeExceptionally(exception)); + } + return EMPTY; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 26d39715d27..afbb81eb53f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -1494,6 +1494,47 @@ public class CommitRequestManagerTest { assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty()); } + @Test + public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() { + CommitRequestManager commitRequestManager = create(true, 100); + + Map offsets = Map.of(new TopicPartition("topic", 1), + new OffsetAndMetadata(0)); + + var commitFuture = commitRequestManager.commitAsync(offsets); + + commitRequestManager.signalClose(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + when(coordinatorRequestManager.fatalError()) + .thenReturn(Optional.of(new GroupAuthorizationException("Fatal error"))); + + assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(time.milliseconds())); + + assertTrue(commitFuture.isCompletedExceptionally()); + + TestUtils.assertFutureThrows(GroupAuthorizationException.class, commitFuture, "Fatal error"); + } + + @Test + public void testPollWithClosingAndPendingRequests() { + CommitRequestManager commitRequestManager = create(true, 100); + + Map offsets = Map.of(new TopicPartition("topic", 1), + new OffsetAndMetadata(0)); + + var commitFuture = commitRequestManager.commitAsync(offsets); + + commitRequestManager.signalClose(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + + assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(time.milliseconds())); + + assertTrue(commitFuture.isCompletedExceptionally()); + + TestUtils.assertFutureThrows(CommitFailedException.class, commitFuture, + "Failed to commit offsets: Coordinator unknown and consumer is closing"); + } + // Supplies (error, isRetriable) private static Stream partitionDataErrorSupplier() { return Stream.of(