diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index f65a0642a43..a956ef3a939 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -383,7 +383,15 @@ public class HeartbeatRequestManager implements RequestManager { case UNRELEASED_INSTANCE_ID: logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}", membershipManager.groupInstanceId().orElse("null"), errorMessage); - handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage)); + handleFatalFailure(error.exception(errorMessage)); + break; + + case FENCED_INSTANCE_ID: + logger.error("GroupHeartbeatRequest failed due to fenced instance id {}: {}. " + + "This is expected in the case that the member was removed from the group " + + "by an admin client, and another member joined using the same group instance id.", + membershipManager.groupInstanceId().orElse("null"), errorMessage); + handleFatalFailure(error.exception(errorMessage)); break; case INVALID_REQUEST: diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java index f7a73dbae8a..8334fb23605 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.Heart import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.HeartbeatState; import org.apache.kafka.clients.consumer.internals.MembershipManager.LocalAssignment; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.Uuid; @@ -51,6 +52,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import java.util.Arrays; import java.util.Collection; @@ -483,8 +485,7 @@ public class HeartbeatRequestManagerTest { break; default: if (isFatal) { - // The memberStateManager should have stopped heartbeat at this point - ensureFatalError(); + ensureFatalError(error); } else { verify(backgroundEventHandler, never()).add(any()); assertNextHeartbeatTiming(0); @@ -781,9 +782,15 @@ public class HeartbeatRequestManagerTest { assertEquals(MemberState.STABLE, membershipManager.state()); } - private void ensureFatalError() { + private void ensureFatalError(Errors expectedError) { verify(membershipManager).transitionToFatal(); - verify(backgroundEventHandler).add(any()); + + final ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); + verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); + ErrorEvent errorEvent = errorEventArgumentCaptor.getValue(); + assertInstanceOf(expectedError.exception().getClass(), errorEvent.error(), + "The fatal error propagated to the app thread does not match the error received in the heartbeat response."); + ensureHeartbeatStopped(); } @@ -808,6 +815,7 @@ public class HeartbeatRequestManagerTest { Arguments.of(Errors.UNSUPPORTED_ASSIGNOR, true), Arguments.of(Errors.UNSUPPORTED_VERSION, true), Arguments.of(Errors.UNRELEASED_INSTANCE_ID, true), + Arguments.of(Errors.FENCED_INSTANCE_ID, true), Arguments.of(Errors.GROUP_MAX_SIZE_REACHED, true)); }