KAFKA-17267; Don't return REQUEST_TIMED_OUT for OFFSET_FETCHes (#16825)

When handling an OFFSET_FETCH request requiring stable offsets, the new
group coordinator may encounter a timeout under some circumstances, such
as a zombie coordinator or a lagging __consumer_offsets replica that has
not yet dropped out of the ISR. Existing and older clients do not expect
the REQUEST_TIMED_OUT error code won't retry, so remap it to
NOT_COORDINATOR to trigger a coordinator lookup and retry.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Sean Quah 2024-08-09 09:06:38 +01:00 committed by GitHub
parent 8ce514a52e
commit 7a8edffad1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 152 additions and 2 deletions

View File

@ -825,7 +825,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
Collections.emptyList(), Collections.emptyList(),
coordinator.fetchOffsets(request, Long.MAX_VALUE) coordinator.fetchOffsets(request, Long.MAX_VALUE)
) )
); ).exceptionally(exception -> handleOffsetFetchException(
"fetch-offsets",
request,
exception
));
} else { } else {
return runtime.scheduleReadOperation( return runtime.scheduleReadOperation(
"fetch-offsets", "fetch-offsets",
@ -876,7 +880,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
Collections.emptyList(), Collections.emptyList(),
coordinator.fetchAllOffsets(request, Long.MAX_VALUE) coordinator.fetchAllOffsets(request, Long.MAX_VALUE)
) )
); ).exceptionally(exception -> handleOffsetFetchException(
"fetch-all-offsets",
request,
exception
));
} else { } else {
return runtime.scheduleReadOperation( return runtime.scheduleReadOperation(
"fetch-all-offsets", "fetch-all-offsets",
@ -1217,4 +1225,49 @@ public class GroupCoordinatorService implements GroupCoordinator {
return handler.apply(apiError.error(), apiError.message()); return handler.apply(apiError.error(), apiError.message());
} }
} }
/**
* This is the handler used by offset fetch operations to convert errors to coordinator errors.
* The handler also handles and log unexpected errors.
*
* @param operationName The name of the operation.
* @param request The OffsetFetchRequestGroup request.
* @param exception The exception to handle.
* @return The OffsetFetchRequestGroup response.
*/
private OffsetFetchResponseData.OffsetFetchResponseGroup handleOffsetFetchException(
String operationName,
OffsetFetchRequestData.OffsetFetchRequestGroup request,
Throwable exception
) {
ApiError apiError = ApiError.fromThrowable(exception);
switch (apiError.error()) {
case UNKNOWN_TOPIC_OR_PARTITION:
case NOT_ENOUGH_REPLICAS:
case REQUEST_TIMED_OUT:
// Remap REQUEST_TIMED_OUT to NOT_COORDINATOR, since consumers on versions prior
// to 3.9 do not expect the error and won't retry the request. NOT_COORDINATOR
// additionally triggers coordinator re-lookup, which is necessary if the client is
// talking to a zombie coordinator.
//
// While handleOperationException does remap UNKNOWN_TOPIC_OR_PARTITION,
// NOT_ENOUGH_REPLICAS and REQUEST_TIMED_OUT to COORDINATOR_NOT_AVAILABLE,
// COORDINATOR_NOT_AVAILABLE is also not handled by consumers on versions prior to
// 3.9.
return new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(request.groupId())
.setErrorCode(Errors.NOT_COORDINATOR.code());
default:
return handleOperationException(
operationName,
request,
exception,
(error, __) -> new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(request.groupId())
.setErrorCode(error.code())
);
}
}
} }

View File

@ -1162,6 +1162,56 @@ public class GroupCoordinatorServiceTest {
); );
} }
@ParameterizedTest
@CsvSource({
"UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR",
"NOT_ENOUGH_REPLICAS, NOT_COORDINATOR",
"REQUEST_TIMED_OUT, NOT_COORDINATOR",
"NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR",
"KAFKA_STORAGE_ERROR, NOT_COORDINATOR",
})
public void testFetchOffsetsWithWrappedError(
Errors error,
Errors expectedError
) throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime,
new GroupCoordinatorMetrics()
);
service.startup(() -> 1);
OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group")
.setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("fetch-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception())));
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
request,
true
);
assertEquals(
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId("group")
.setErrorCode(expectedError.code()),
future.get()
);
}
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})
public void testFetchAllOffsets( public void testFetchAllOffsets(
@ -1245,6 +1295,53 @@ public class GroupCoordinatorServiceTest {
); );
} }
@ParameterizedTest
@CsvSource({
"UNKNOWN_TOPIC_OR_PARTITION, NOT_COORDINATOR",
"NOT_ENOUGH_REPLICAS, NOT_COORDINATOR",
"REQUEST_TIMED_OUT, NOT_COORDINATOR",
"NOT_LEADER_OR_FOLLOWER, NOT_COORDINATOR",
"KAFKA_STORAGE_ERROR, NOT_COORDINATOR",
})
public void testFetchAllOffsetsWithWrappedError(
Errors error,
Errors expectedError
) throws ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime,
new GroupCoordinatorMetrics()
);
service.startup(() -> 1);
OffsetFetchRequestData.OffsetFetchRequestGroup request =
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("group");
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("fetch-all-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(new CompletionException(error.exception())));
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> future = service.fetchAllOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
request,
true
);
assertEquals(
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId("group")
.setErrorCode(expectedError.code()),
future.get()
);
}
@Test @Test
public void testLeaveGroup() throws Exception { public void testLeaveGroup() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();