diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 939d078ebaf..e435b3acd5c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.clients.consumer.internals.metrics.OffsetCommitMetricsManager; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.StaleMemberEpochException; @@ -981,26 +982,28 @@ public class CommitRequestManager implements RequestManager, MemberStateListener final Errors responseError) { log.debug("Offset fetch failed: {}", responseError.message()); onFailedAttempt(currentTimeMs); + ApiException exception = responseError.exception(); if (responseError == COORDINATOR_LOAD_IN_PROGRESS) { - future.completeExceptionally(responseError.exception()); + future.completeExceptionally(exception); } else if (responseError == Errors.UNKNOWN_MEMBER_ID) { log.error("OffsetFetch failed with {} because the member is not part of the group" + " anymore.", responseError); - future.completeExceptionally(responseError.exception()); + future.completeExceptionally(exception); } else if (responseError == Errors.STALE_MEMBER_EPOCH) { log.error("OffsetFetch failed with {} and the consumer is not part " + "of the group anymore (it probably left the group, got fenced" + " or failed). The request cannot be retried and will fail.", responseError); - future.completeExceptionally(responseError.exception()); + future.completeExceptionally(exception); } else if (responseError == Errors.NOT_COORDINATOR || responseError == Errors.COORDINATOR_NOT_AVAILABLE) { // Re-discover the coordinator and retry coordinatorRequestManager.markCoordinatorUnknown("error response " + responseError.name(), currentTimeMs); - future.completeExceptionally(responseError.exception()); + future.completeExceptionally(exception); + } else if (exception instanceof RetriableException) { + future.completeExceptionally(exception); } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId)); } else { - // Fail with a non-retriable KafkaException for all unexpected errors (even if - // they are retriable) + // Fail with a non-retriable KafkaException for all unexpected errors future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message())); } } @@ -1188,8 +1191,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener unsentOffsetFetches.stream() .collect(Collectors.partitioningBy(request -> request.canSendRequest(currentTimeMs))); - failAndRemoveExpiredFetchRequests(); - // Add all sendable offset fetch requests to the unsentRequests list and to the inflightOffsetFetches list for (OffsetFetchRequestState request : partitionedBySendability.get(true)) { request.onSendAttempt(currentTimeMs); @@ -1214,15 +1215,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener requestsToPurge.forEach(RetriableRequestState::maybeExpire); } - /** - * Find the unsent fetch requests that have expired, remove them and complete their - * futures with a TimeoutException. - */ - private void failAndRemoveExpiredFetchRequests() { - Queue requestsToPurge = new LinkedList<>(unsentOffsetFetches); - requestsToPurge.forEach(RetriableRequestState::maybeExpire); - } - private void clearAll() { unsentOffsetCommits.clear(); unsentOffsetFetches.clear(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 523abf4125c..3c7347e6d02 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -649,8 +649,8 @@ public class CommitRequestManagerTest { 1, error); // we only want to make sure to purge the outbound buffer for non-retriables, so retriable will be re-queued. - if (isRetriableOnOffsetFetch(error)) - testRetriable(commitRequestManager, futures); + if (error.exception() instanceof RetriableException) + testRetriable(commitRequestManager, futures, error); else { testNonRetriable(futures); assertEmptyPendingRequests(commitRequestManager); @@ -671,14 +671,15 @@ public class CommitRequestManagerTest { 1, error); - if (isRetriableOnOffsetFetch(error)) { + if (error.exception() instanceof RetriableException) { futures.forEach(f -> assertFalse(f.isDone())); // Insert a long enough sleep to force a timeout of the operation. Invoke poll() again so that each // OffsetFetchRequestState is evaluated via isExpired(). time.sleep(defaultApiTimeoutMs); assertFalse(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); - commitRequestManager.poll(time.milliseconds()); + NetworkClientDelegate.PollResult poll = commitRequestManager.poll(time.milliseconds()); + mimicResponse(error, poll); futures.forEach(f -> assertFutureThrows(f, TimeoutException.class)); assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); } else { @@ -687,10 +688,6 @@ public class CommitRequestManagerTest { } } - private boolean isRetriableOnOffsetFetch(Errors error) { - return error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE; - } - @Test public void testSuccessfulOffsetFetch() { CommitRequestManager commitManager = create(false, 100); @@ -1204,13 +1201,31 @@ public class CommitRequestManagerTest { } private void testRetriable(final CommitRequestManager commitRequestManager, - final List>> futures) { + final List>> futures, + final Errors error + ) { futures.forEach(f -> assertFalse(f.isDone())); - // The manager should backoff for 100ms - time.sleep(100); - commitRequestManager.poll(time.milliseconds()); + // The manager should backoff before retry + time.sleep(retryBackoffMs); + NetworkClientDelegate.PollResult poll = commitRequestManager.poll(time.milliseconds()); + assertEquals(1, poll.unsentRequests.size()); futures.forEach(f -> assertFalse(f.isDone())); + mimicResponse(error, poll); + + // Sleep util timeout + time.sleep(defaultApiTimeoutMs); + poll = commitRequestManager.poll(time.milliseconds()); + assertEquals(1, poll.unsentRequests.size()); + mimicResponse(error, poll); + futures.forEach(f -> { + assertTrue(f.isCompletedExceptionally()); + assertFutureThrows(f, TimeoutException.class); + }); + } + + private void mimicResponse(Errors error, NetworkClientDelegate.PollResult poll) { + poll.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(poll.unsentRequests.get(0), new HashSet<>(), error)); } private void testNonRetriable(final List>> futures) { @@ -1299,7 +1314,7 @@ public class CommitRequestManagerTest { Errors.NONE, false)); if (isRetriable) - testRetriable(commitRequestManager, Collections.singletonList(future)); + testRetriable(commitRequestManager, Collections.singletonList(future), error); else testNonRetriable(Collections.singletonList(future)); }