KAFKA-16815: Handle FencedInstanceId in HB response (#16047)

Handle FencedInstanceIdException that a consumer may receive in the heartbeat response. This will be the case when a static consumer is removed from the group by and admin client, and another member joins with the same group.instance.id (allowed in). The first member will receive a FencedInstanceId on its next heartbeat. The expectation is that this should be handled as a fatal error.

There are no actual changes in logic with this PR, given that without being handled, the FencedInstanceId was being treated as an "unexpected error", which are all treated as fatal errors, so the outcome remains the same. But we're introducing this small change just for accuracy in the logic and the logs: FencedInstanceId is expected during heartbeat, a log line is shown describing the situation and why it happened (and it's treated as a fatal error, just like it was before this PR).

This PR also improves the test to ensure that the error propagated to the app thread matches the one received in the HB.

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
Lianet Magrans 2024-05-24 14:19:43 +02:00 committed by GitHub
parent c5cd190818
commit 0143c72e50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 5 deletions

View File

@ -383,7 +383,15 @@ public class HeartbeatRequestManager implements RequestManager {
case UNRELEASED_INSTANCE_ID:
logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}",
membershipManager.groupInstanceId().orElse("null"), errorMessage);
handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage));
handleFatalFailure(error.exception(errorMessage));
break;
case FENCED_INSTANCE_ID:
logger.error("GroupHeartbeatRequest failed due to fenced instance id {}: {}. " +
"This is expected in the case that the member was removed from the group " +
"by an admin client, and another member joined using the same group instance id.",
membershipManager.groupInstanceId().orElse("null"), errorMessage);
handleFatalFailure(error.exception(errorMessage));
break;
case INVALID_REQUEST:

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.Heart
import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.HeartbeatState;
import org.apache.kafka.clients.consumer.internals.MembershipManager.LocalAssignment;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
@ -51,6 +52,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.Collection;
@ -483,8 +485,7 @@ public class HeartbeatRequestManagerTest {
break;
default:
if (isFatal) {
// The memberStateManager should have stopped heartbeat at this point
ensureFatalError();
ensureFatalError(error);
} else {
verify(backgroundEventHandler, never()).add(any());
assertNextHeartbeatTiming(0);
@ -781,9 +782,15 @@ public class HeartbeatRequestManagerTest {
assertEquals(MemberState.STABLE, membershipManager.state());
}
private void ensureFatalError() {
private void ensureFatalError(Errors expectedError) {
verify(membershipManager).transitionToFatal();
verify(backgroundEventHandler).add(any());
final ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
assertInstanceOf(expectedError.exception().getClass(), errorEvent.error(),
"The fatal error propagated to the app thread does not match the error received in the heartbeat response.");
ensureHeartbeatStopped();
}
@ -808,6 +815,7 @@ public class HeartbeatRequestManagerTest {
Arguments.of(Errors.UNSUPPORTED_ASSIGNOR, true),
Arguments.of(Errors.UNSUPPORTED_VERSION, true),
Arguments.of(Errors.UNRELEASED_INSTANCE_ID, true),
Arguments.of(Errors.FENCED_INSTANCE_ID, true),
Arguments.of(Errors.GROUP_MAX_SIZE_REACHED, true));
}