KAFKA-17279: Handle retriable errors from offset fetches (#16826)

Handle retriable errors from offset fetches in ConsumerCoordinator.

Reviewers: Lianet Magrans <lianetmr@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
Sean Quah 2024-08-20 14:13:25 +01:00 committed by GitHub
parent de67ac6a9a
commit c207438823
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 23 deletions

View File

@ -1457,15 +1457,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (responseError != Errors.NONE) {
log.debug("Offset fetch failed: {}", responseError.message());
if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
// just retry
future.raise(responseError);
} else if (responseError == Errors.NOT_COORDINATOR) {
if (responseError == Errors.COORDINATOR_NOT_AVAILABLE ||
responseError == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry
markCoordinatorUnknown(responseError);
future.raise(responseError);
} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else if (responseError.exception() instanceof RetriableException) {
// retry
future.raise(responseError);
} else {
future.raise(new KafkaException("Unexpected error in fetch offset response: " + responseError.message()));
}

View File

@ -87,6 +87,8 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@ -3148,21 +3150,6 @@ public abstract class ConsumerCoordinatorTest {
assertEquals(singleton(topic1), exception.unauthorizedTopics());
}
@Test
public void testRefreshOffsetLoadInProgress() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
subscriptions.assignFromUser(singleton(t1p));
client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
coordinator.initWithCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
assertEquals(Collections.emptySet(), subscriptions.initializingPartitions());
assertTrue(subscriptions.hasAllFetchPositions());
assertEquals(100L, subscriptions.position(t1p).offset);
}
@Test
public void testRefreshOffsetsGroupNotAuthorized() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@ -3206,14 +3193,22 @@ public abstract class ConsumerCoordinatorTest {
assertThrows(KafkaException.class, () -> coordinator.initWithCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE)));
}
@Test
public void testRefreshOffsetNotCoordinatorForConsumer() {
@ParameterizedTest
@CsvSource({
"NOT_COORDINATOR, true",
"COORDINATOR_NOT_AVAILABLE, true",
"COORDINATOR_LOAD_IN_PROGRESS, false",
"NETWORK_EXCEPTION, false",
})
public void testRefreshOffsetRetriableErrorCoordinatorLookup(Errors error, boolean expectCoordinatorRelookup) {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
subscriptions.assignFromUser(singleton(t1p));
client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.prepareResponse(offsetFetchResponse(error, Collections.emptyMap()));
if (expectCoordinatorRelookup) {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
}
client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
coordinator.initWithCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));