KAFKA-17294 Handle retriable errors when fetching offsets in new consumer (#16833)

The original behavior was implemented to maintain the behavior of the Classic consumer, where the ConsumerCoordinator would do the same when handling the OffsetFetchResponse. This behavior is being updated for the legacy coordinator as part of KAFKA-17279, to retry on all retriable errors.

We should review and update the CommitRequestManager to align with this, and retry on all retriable errors, which seems sensible when fetching offsets.

The corresponding PR for classic consumer is #16826

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2024-09-03 19:05:29 +08:00 committed by GitHub
parent f59d829381
commit 2f9b236259
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 30 deletions

View File

@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.metrics.OffsetCommitMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
@ -981,26 +982,28 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
final Errors responseError) {
log.debug("Offset fetch failed: {}", responseError.message());
onFailedAttempt(currentTimeMs);
ApiException exception = responseError.exception();
if (responseError == COORDINATOR_LOAD_IN_PROGRESS) {
future.completeExceptionally(responseError.exception());
future.completeExceptionally(exception);
} else if (responseError == Errors.UNKNOWN_MEMBER_ID) {
log.error("OffsetFetch failed with {} because the member is not part of the group" +
" anymore.", responseError);
future.completeExceptionally(responseError.exception());
future.completeExceptionally(exception);
} else if (responseError == Errors.STALE_MEMBER_EPOCH) {
log.error("OffsetFetch failed with {} and the consumer is not part " +
"of the group anymore (it probably left the group, got fenced" +
" or failed). The request cannot be retried and will fail.", responseError);
future.completeExceptionally(responseError.exception());
future.completeExceptionally(exception);
} else if (responseError == Errors.NOT_COORDINATOR || responseError == Errors.COORDINATOR_NOT_AVAILABLE) {
// Re-discover the coordinator and retry
coordinatorRequestManager.markCoordinatorUnknown("error response " + responseError.name(), currentTimeMs);
future.completeExceptionally(responseError.exception());
future.completeExceptionally(exception);
} else if (exception instanceof RetriableException) {
future.completeExceptionally(exception);
} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
} else {
// Fail with a non-retriable KafkaException for all unexpected errors (even if
// they are retriable)
// Fail with a non-retriable KafkaException for all unexpected errors
future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message()));
}
}
@ -1188,8 +1191,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
unsentOffsetFetches.stream()
.collect(Collectors.partitioningBy(request -> request.canSendRequest(currentTimeMs)));
failAndRemoveExpiredFetchRequests();
// Add all sendable offset fetch requests to the unsentRequests list and to the inflightOffsetFetches list
for (OffsetFetchRequestState request : partitionedBySendability.get(true)) {
request.onSendAttempt(currentTimeMs);
@ -1214,15 +1215,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
requestsToPurge.forEach(RetriableRequestState::maybeExpire);
}
/**
* Find the unsent fetch requests that have expired, remove them and complete their
* futures with a TimeoutException.
*/
private void failAndRemoveExpiredFetchRequests() {
Queue<OffsetFetchRequestState> requestsToPurge = new LinkedList<>(unsentOffsetFetches);
requestsToPurge.forEach(RetriableRequestState::maybeExpire);
}
private void clearAll() {
unsentOffsetCommits.clear();
unsentOffsetFetches.clear();

View File

@ -649,8 +649,8 @@ public class CommitRequestManagerTest {
1,
error);
// we only want to make sure to purge the outbound buffer for non-retriables, so retriable will be re-queued.
if (isRetriableOnOffsetFetch(error))
testRetriable(commitRequestManager, futures);
if (error.exception() instanceof RetriableException)
testRetriable(commitRequestManager, futures, error);
else {
testNonRetriable(futures);
assertEmptyPendingRequests(commitRequestManager);
@ -671,14 +671,15 @@ public class CommitRequestManagerTest {
1,
error);
if (isRetriableOnOffsetFetch(error)) {
if (error.exception() instanceof RetriableException) {
futures.forEach(f -> assertFalse(f.isDone()));
// Insert a long enough sleep to force a timeout of the operation. Invoke poll() again so that each
// OffsetFetchRequestState is evaluated via isExpired().
time.sleep(defaultApiTimeoutMs);
assertFalse(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
commitRequestManager.poll(time.milliseconds());
NetworkClientDelegate.PollResult poll = commitRequestManager.poll(time.milliseconds());
mimicResponse(error, poll);
futures.forEach(f -> assertFutureThrows(f, TimeoutException.class));
assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
} else {
@ -687,10 +688,6 @@ public class CommitRequestManagerTest {
}
}
private boolean isRetriableOnOffsetFetch(Errors error) {
return error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE;
}
@Test
public void testSuccessfulOffsetFetch() {
CommitRequestManager commitManager = create(false, 100);
@ -1204,13 +1201,31 @@ public class CommitRequestManagerTest {
}
private void testRetriable(final CommitRequestManager commitRequestManager,
final List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
final List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures,
final Errors error
) {
futures.forEach(f -> assertFalse(f.isDone()));
// The manager should backoff for 100ms
time.sleep(100);
commitRequestManager.poll(time.milliseconds());
// The manager should backoff before retry
time.sleep(retryBackoffMs);
NetworkClientDelegate.PollResult poll = commitRequestManager.poll(time.milliseconds());
assertEquals(1, poll.unsentRequests.size());
futures.forEach(f -> assertFalse(f.isDone()));
mimicResponse(error, poll);
// Sleep util timeout
time.sleep(defaultApiTimeoutMs);
poll = commitRequestManager.poll(time.milliseconds());
assertEquals(1, poll.unsentRequests.size());
mimicResponse(error, poll);
futures.forEach(f -> {
assertTrue(f.isCompletedExceptionally());
assertFutureThrows(f, TimeoutException.class);
});
}
private void mimicResponse(Errors error, NetworkClientDelegate.PollResult poll) {
poll.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(poll.unsentRequests.get(0), new HashSet<>(), error));
}
private void testNonRetriable(final List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
@ -1299,7 +1314,7 @@ public class CommitRequestManagerTest {
Errors.NONE,
false));
if (isRetriable)
testRetriable(commitRequestManager, Collections.singletonList(future));
testRetriable(commitRequestManager, Collections.singletonList(future), error);
else
testNonRetriable(Collections.singletonList(future));
}