mirror of https://github.com/apache/kafka.git
KAFKA-18736: Handle errors in the Streams group heartbeat request manager (#19230)
This commit adds error handling to the Streams heartbeat request manager. Errors can occur while sending a heartbeat request and when a response with an error code that is not NONE is received. Some errors are handled explicitly to recover from them or to log specific messages. All the others are handled as fatal errors. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
9db5888609
commit
266532f562
|
|
@ -22,6 +22,9 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler
|
|||
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.GroupAuthorizationException;
|
||||
import org.apache.kafka.common.errors.RetriableException;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
|
||||
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
|
|
@ -59,6 +62,11 @@ import java.util.stream.IntStream;
|
|||
*/
|
||||
public class StreamsGroupHeartbeatRequestManager implements RequestManager {
|
||||
|
||||
private static final String UNSUPPORTED_VERSION_ERROR_MESSAGE = "The cluster does not support the STREAMS group " +
|
||||
"protocol or does not support the versions of the STREAMS group protocol used by this client " +
|
||||
"(used versions: " + StreamsGroupHeartbeatRequestData.LOWEST_SUPPORTED_VERSION + " to " +
|
||||
StreamsGroupHeartbeatRequestData.HIGHEST_SUPPORTED_VERSION + ").";
|
||||
|
||||
static class HeartbeatState {
|
||||
|
||||
// Fields of StreamsGroupHeartbeatRequest sent in the most recent request
|
||||
|
|
@ -409,6 +417,8 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
|
|||
if (response != null) {
|
||||
metricsManager.recordRequestLatency(response.requestLatencyMs());
|
||||
onResponse((StreamsGroupHeartbeatResponse) response.responseBody(), completionTimeMs);
|
||||
} else {
|
||||
onFailure(exception, completionTimeMs);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
@ -428,6 +438,8 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
|
|||
private void onResponse(final StreamsGroupHeartbeatResponse response, long currentTimeMs) {
|
||||
if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
|
||||
onSuccessResponse(response, currentTimeMs);
|
||||
} else {
|
||||
onErrorResponse(response, currentTimeMs);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -451,6 +463,146 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
|
|||
membershipManager.onHeartbeatSuccess(response);
|
||||
}
|
||||
|
||||
private void onErrorResponse(final StreamsGroupHeartbeatResponse response, final long currentTimeMs) {
|
||||
final Errors error = Errors.forCode(response.data().errorCode());
|
||||
final String errorMessage = response.data().errorMessage();
|
||||
|
||||
heartbeatState.reset();
|
||||
this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
|
||||
|
||||
switch (error) {
|
||||
case NOT_COORDINATOR:
|
||||
logInfo(
|
||||
String.format("StreamsGroupHeartbeatRequest failed because the group coordinator %s is incorrect. " +
|
||||
"Will attempt to find the coordinator again and retry", coordinatorRequestManager.coordinator()),
|
||||
response,
|
||||
currentTimeMs
|
||||
);
|
||||
coordinatorRequestManager.markCoordinatorUnknown(errorMessage, currentTimeMs);
|
||||
// Skip backoff so that the next HB is sent as soon as the new coordinator is discovered
|
||||
heartbeatRequestState.reset();
|
||||
break;
|
||||
|
||||
case COORDINATOR_NOT_AVAILABLE:
|
||||
logInfo(
|
||||
String.format("StreamsGroupHeartbeatRequest failed because the group coordinator %s is not available. " +
|
||||
"Will attempt to find the coordinator again and retry", coordinatorRequestManager.coordinator()),
|
||||
response,
|
||||
currentTimeMs
|
||||
);
|
||||
coordinatorRequestManager.markCoordinatorUnknown(errorMessage, currentTimeMs);
|
||||
// Skip backoff so that the next HB is sent as soon as the new coordinator is discovered
|
||||
heartbeatRequestState.reset();
|
||||
break;
|
||||
|
||||
case COORDINATOR_LOAD_IN_PROGRESS:
|
||||
logInfo(
|
||||
String.format("StreamsGroupHeartbeatRequest failed because the group coordinator %s is still loading. " +
|
||||
"Will retry", coordinatorRequestManager.coordinator()),
|
||||
response,
|
||||
currentTimeMs
|
||||
);
|
||||
break;
|
||||
|
||||
case GROUP_AUTHORIZATION_FAILED:
|
||||
GroupAuthorizationException exception =
|
||||
GroupAuthorizationException.forGroupId(membershipManager.groupId());
|
||||
logger.error("StreamsGroupHeartbeatRequest failed due to group authorization failure: {}",
|
||||
exception.getMessage());
|
||||
handleFatalFailure(error.exception(exception.getMessage()));
|
||||
break;
|
||||
|
||||
case TOPIC_AUTHORIZATION_FAILED:
|
||||
logger.error("StreamsGroupHeartbeatRequest failed for member {} with state {} due to {}: {}",
|
||||
membershipManager.memberId(), membershipManager.state(), error, errorMessage);
|
||||
// Propagate auth error received in HB so that it's returned on poll.
|
||||
// Member should stay in its current state so it can recover if ever the missing ACLs are added.
|
||||
backgroundEventHandler.add(new ErrorEvent(error.exception()));
|
||||
break;
|
||||
|
||||
case INVALID_REQUEST:
|
||||
case GROUP_MAX_SIZE_REACHED:
|
||||
case STREAMS_INVALID_TOPOLOGY:
|
||||
case STREAMS_INVALID_TOPOLOGY_EPOCH:
|
||||
case STREAMS_TOPOLOGY_FENCED:
|
||||
logger.error("StreamsGroupHeartbeatRequest failed due to {}: {}", error, errorMessage);
|
||||
handleFatalFailure(error.exception(errorMessage));
|
||||
break;
|
||||
|
||||
case FENCED_MEMBER_EPOCH:
|
||||
logInfo(
|
||||
String.format("StreamsGroupHeartbeatRequest failed for member %s because epoch %s is fenced.",
|
||||
membershipManager.memberId(), membershipManager.memberEpoch()),
|
||||
response,
|
||||
currentTimeMs
|
||||
);
|
||||
membershipManager.onFenced();
|
||||
// Skip backoff so that a next HB to rejoin is sent as soon as the fenced member releases its assignment
|
||||
heartbeatRequestState.reset();
|
||||
break;
|
||||
|
||||
case UNKNOWN_MEMBER_ID:
|
||||
logInfo(
|
||||
String.format("StreamsGroupHeartbeatRequest failed because member %s is unknown.",
|
||||
membershipManager.memberId()),
|
||||
response,
|
||||
currentTimeMs
|
||||
);
|
||||
membershipManager.onFenced();
|
||||
// Skip backoff so that a next HB to rejoin is sent as soon as the fenced member releases its assignment
|
||||
heartbeatRequestState.reset();
|
||||
break;
|
||||
|
||||
case UNSUPPORTED_VERSION:
|
||||
logger.error("StreamsGroupHeartbeatRequest failed due to {}: {}", error, UNSUPPORTED_VERSION_ERROR_MESSAGE);
|
||||
handleFatalFailure(error.exception(UNSUPPORTED_VERSION_ERROR_MESSAGE));
|
||||
break;
|
||||
|
||||
default:
|
||||
logger.error("StreamsGroupHeartbeatRequest failed due to unexpected error {}: {}", error, errorMessage);
|
||||
handleFatalFailure(error.exception(errorMessage));
|
||||
}
|
||||
membershipManager.onFatalHeartbeatFailure();
|
||||
}
|
||||
|
||||
private void logInfo(final String message,
|
||||
final StreamsGroupHeartbeatResponse response,
|
||||
final long currentTimeMs) {
|
||||
logger.info("{} in {}ms: {}",
|
||||
message,
|
||||
heartbeatRequestState.remainingBackoffMs(currentTimeMs),
|
||||
response.data().errorMessage());
|
||||
}
|
||||
|
||||
private void onFailure(final Throwable exception, final long responseTimeMs) {
|
||||
heartbeatRequestState.onFailedAttempt(responseTimeMs);
|
||||
heartbeatState.reset();
|
||||
if (exception instanceof RetriableException) {
|
||||
coordinatorRequestManager.handleCoordinatorDisconnect(exception, responseTimeMs);
|
||||
String message = String.format("StreamsGroupHeartbeatRequest failed because of a retriable exception. Will retry in %s ms: %s",
|
||||
heartbeatRequestState.remainingBackoffMs(responseTimeMs),
|
||||
exception.getMessage());
|
||||
logger.debug(message);
|
||||
membershipManager.onRetriableHeartbeatFailure();
|
||||
} else {
|
||||
if (exception instanceof UnsupportedVersionException) {
|
||||
logger.error("StreamsGroupHeartbeatRequest failed because of an unsupported version exception: {}",
|
||||
exception.getMessage());
|
||||
handleFatalFailure(new UnsupportedVersionException(UNSUPPORTED_VERSION_ERROR_MESSAGE));
|
||||
} else {
|
||||
logger.error("StreamsGroupHeartbeatRequest failed because of a fatal exception while sending request: {}",
|
||||
exception.getMessage());
|
||||
handleFatalFailure(exception);
|
||||
}
|
||||
membershipManager.onFatalHeartbeatFailure();
|
||||
}
|
||||
}
|
||||
|
||||
private void handleFatalFailure(Throwable error) {
|
||||
backgroundEventHandler.add(new ErrorEvent(error));
|
||||
membershipManager.transitionToFatal();
|
||||
}
|
||||
|
||||
private static Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> convertHostInfoMap(final StreamsGroupHeartbeatResponseData data) {
|
||||
Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> partitionsByHost = new HashMap<>();
|
||||
data.partitionsByUserEndpoint().forEach(endpoint -> {
|
||||
|
|
|
|||
|
|
@ -702,14 +702,21 @@ public class StreamsMembershipManager implements RequestManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Notify the member that an error heartbeat response was received.
|
||||
*
|
||||
* @param retriable True if the request failed with a retriable error.
|
||||
* Notify the member that a retriable error heartbeat response was received.
|
||||
*/
|
||||
public void onHeartbeatFailure(boolean retriable) {
|
||||
if (!retriable) {
|
||||
metricsManager.maybeRecordRebalanceFailed();
|
||||
}
|
||||
public void onRetriableHeartbeatFailure() {
|
||||
onHeartbeatFailure();
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify the member that a fatal error heartbeat response was received.
|
||||
*/
|
||||
public void onFatalHeartbeatFailure() {
|
||||
metricsManager.maybeRecordRebalanceFailed();
|
||||
onHeartbeatFailure();
|
||||
}
|
||||
|
||||
private void onHeartbeatFailure() {
|
||||
// The leave group request is sent out once (not retried), so we should complete the leave
|
||||
// operation once the request completes, regardless of the response.
|
||||
if (state == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) {
|
||||
|
|
|
|||
|
|
@ -22,14 +22,20 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler
|
|||
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.DisconnectException;
|
||||
import org.apache.kafka.common.errors.GroupAuthorizationException;
|
||||
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
|
||||
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.RequestHeader;
|
||||
import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
|
||||
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.utils.LogCaptureAppender;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
|
|
@ -42,10 +48,12 @@ import org.junit.jupiter.params.provider.Arguments;
|
|||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockedConstruction;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
|
@ -61,6 +69,7 @@ import java.util.stream.Stream;
|
|||
import static org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
|
@ -529,7 +538,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
|
||||
verify(membershipManager).onHeartbeatRequestGenerated();
|
||||
final ClientResponse response = buildClientResponse();
|
||||
networkRequest.future().complete(response);
|
||||
networkRequest.handler().onComplete(response);
|
||||
verify(heartbeatRequestState, never()).updateHeartbeatIntervalMs(anyLong());
|
||||
verify(heartbeatRequestState, never()).onSuccessfulAttempt(anyLong());
|
||||
verify(membershipManager, never()).onHeartbeatSuccess(any());
|
||||
|
|
@ -572,7 +581,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue()
|
||||
);
|
||||
final ClientResponse response = buildClientResponse();
|
||||
networkRequest.future().complete(response);
|
||||
networkRequest.handler().onComplete(response);
|
||||
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) response.responseBody());
|
||||
verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS);
|
||||
verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
|
||||
|
|
@ -975,6 +984,404 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
assertTrue(requestDataWithShutdownRequest.shutdownApplication());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoordinatorDisconnectFailureWhileSending() {
|
||||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
|
||||
|
||||
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
|
||||
assertEquals(1, result.unsentRequests.size());
|
||||
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
|
||||
time.sleep(1234);
|
||||
final long completionTimeMs = time.milliseconds();
|
||||
final DisconnectException disconnectException = DisconnectException.INSTANCE;
|
||||
networkRequest.handler().onFailure(completionTimeMs, disconnectException);
|
||||
final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0);
|
||||
verify(heartbeatRequestState).onFailedAttempt(completionTimeMs);
|
||||
verify(heartbeatState).reset();
|
||||
verify(coordinatorRequestManager).handleCoordinatorDisconnect(disconnectException, completionTimeMs);
|
||||
verify(membershipManager).onRetriableHeartbeatFailure();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnsupportedVersionFailureWhileSending() {
|
||||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
|
||||
|
||||
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
|
||||
assertEquals(1, result.unsentRequests.size());
|
||||
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
|
||||
time.sleep(1234);
|
||||
final long completionTimeMs = time.milliseconds();
|
||||
final UnsupportedVersionException unsupportedVersionException = new UnsupportedVersionException("message");
|
||||
networkRequest.handler().onFailure(completionTimeMs, unsupportedVersionException);
|
||||
final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0);
|
||||
verify(heartbeatRequestState).onFailedAttempt(completionTimeMs);
|
||||
verify(heartbeatState).reset();
|
||||
verify(membershipManager).onFatalHeartbeatFailure();
|
||||
ArgumentCaptor<ErrorEvent> errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
|
||||
verify(backgroundEventHandler).add(errorEvent.capture());
|
||||
assertEquals(
|
||||
"The cluster does not support the STREAMS group " +
|
||||
"protocol or does not support the versions of the STREAMS group protocol used by this client " +
|
||||
"(used versions: " + StreamsGroupHeartbeatRequestData.LOWEST_SUPPORTED_VERSION + " to " +
|
||||
StreamsGroupHeartbeatRequestData.HIGHEST_SUPPORTED_VERSION + ").",
|
||||
errorEvent.getValue().error().getMessage()
|
||||
);
|
||||
assertInstanceOf(UnsupportedVersionException.class, errorEvent.getValue().error());
|
||||
verify(membershipManager).transitionToFatal();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFatalFailureWhileSending() {
|
||||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
|
||||
|
||||
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
|
||||
assertEquals(1, result.unsentRequests.size());
|
||||
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
|
||||
time.sleep(1234);
|
||||
final long completionTimeMs = time.milliseconds();
|
||||
final RuntimeException fatalException = new RuntimeException();
|
||||
networkRequest.handler().onFailure(completionTimeMs, fatalException);
|
||||
final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0);
|
||||
verify(heartbeatRequestState).onFailedAttempt(completionTimeMs);
|
||||
verify(heartbeatState).reset();
|
||||
verify(membershipManager).onFatalHeartbeatFailure();
|
||||
ArgumentCaptor<ErrorEvent> errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
|
||||
verify(backgroundEventHandler).add(errorEvent.capture());
|
||||
assertEquals(fatalException, errorEvent.getValue().error());
|
||||
verify(membershipManager).transitionToFatal();
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(
|
||||
value = Errors.class,
|
||||
names = {"NOT_COORDINATOR", "COORDINATOR_NOT_AVAILABLE"}
|
||||
)
|
||||
public void testNotCoordinatorAndCoordinatorNotAvailableErrorResponse(final Errors error) {
|
||||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
|
||||
final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0);
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
|
||||
|
||||
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
|
||||
assertEquals(1, result.unsentRequests.size());
|
||||
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
|
||||
time.sleep(1234);
|
||||
final long completionTimeMs = time.milliseconds();
|
||||
final ClientResponse response = buildClientErrorResponse(error, "error message");
|
||||
networkRequest.handler().onComplete(response);
|
||||
verify(coordinatorRequestManager).markCoordinatorUnknown(
|
||||
((StreamsGroupHeartbeatResponse) response.responseBody()).data().errorMessage(),
|
||||
completionTimeMs
|
||||
);
|
||||
verify(heartbeatState).reset();
|
||||
verify(heartbeatRequestState).reset();
|
||||
verify(membershipManager).onFatalHeartbeatFailure();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoordinatorLoadInProgressErrorResponse() {
|
||||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
|
||||
final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0);
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
|
||||
|
||||
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
|
||||
assertEquals(1, result.unsentRequests.size());
|
||||
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
|
||||
final ClientResponse response = buildClientErrorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, "message");
|
||||
networkRequest.handler().onComplete(response);
|
||||
verify(heartbeatState).reset();
|
||||
verify(membershipManager).onFatalHeartbeatFailure();
|
||||
verify(heartbeatRequestState, never()).reset();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupAuthorizationFailedErrorResponse() {
|
||||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
|
||||
final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
|
||||
when(membershipManager.groupId()).thenReturn(GROUP_ID);
|
||||
|
||||
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
|
||||
assertEquals(1, result.unsentRequests.size());
|
||||
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
|
||||
final ClientResponse response = buildClientErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED, "message");
|
||||
networkRequest.handler().onComplete(response);
|
||||
assertTrue(logAppender.getMessages("ERROR").stream()
|
||||
.anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed due to group authorization failure: " +
|
||||
"Not authorized to access group: " + GROUP_ID)));
|
||||
verify(heartbeatState).reset();
|
||||
ArgumentCaptor<ErrorEvent> errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
|
||||
verify(backgroundEventHandler).add(errorEvent.capture());
|
||||
assertEquals(
|
||||
GroupAuthorizationException.forGroupId(GROUP_ID).getMessage(),
|
||||
errorEvent.getValue().error().getMessage()
|
||||
);
|
||||
assertInstanceOf(GroupAuthorizationException.class, errorEvent.getValue().error());
|
||||
verify(membershipManager).transitionToFatal();
|
||||
verify(membershipManager).onFatalHeartbeatFailure();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopicAuthorizationFailedErrorResponse() {
|
||||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
|
||||
final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
|
||||
when(membershipManager.state()).thenReturn(MemberState.STABLE);
|
||||
when(membershipManager.memberId()).thenReturn(MEMBER_ID);
|
||||
|
||||
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
|
||||
assertEquals(1, result.unsentRequests.size());
|
||||
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
|
||||
final String errorMessage = "message";
|
||||
final ClientResponse response = buildClientErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, errorMessage);
|
||||
networkRequest.handler().onComplete(response);
|
||||
assertTrue(logAppender.getMessages("ERROR").stream()
|
||||
.anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed for member " + MEMBER_ID +
|
||||
" with state " + MemberState.STABLE + " due to " + Errors.TOPIC_AUTHORIZATION_FAILED + ": " +
|
||||
errorMessage)));
|
||||
verify(heartbeatState).reset();
|
||||
ArgumentCaptor<ErrorEvent> errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
|
||||
verify(backgroundEventHandler).add(errorEvent.capture());
|
||||
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.message(), errorEvent.getValue().error().getMessage());
|
||||
assertInstanceOf(TopicAuthorizationException.class, errorEvent.getValue().error());
|
||||
verify(membershipManager).onFatalHeartbeatFailure();
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(
|
||||
value = Errors.class,
|
||||
names = {
|
||||
"INVALID_REQUEST",
|
||||
"GROUP_MAX_SIZE_REACHED",
|
||||
"UNSUPPORTED_VERSION",
|
||||
"STREAMS_INVALID_TOPOLOGY",
|
||||
"STREAMS_INVALID_TOPOLOGY_EPOCH",
|
||||
"STREAMS_TOPOLOGY_FENCED"
|
||||
}
|
||||
)
|
||||
public void testKnownFatalErrorResponse(final Errors error) {
|
||||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
|
||||
final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
|
||||
|
||||
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
|
||||
assertEquals(1, result.unsentRequests.size());
|
||||
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
|
||||
final String errorMessageInResponse = "message";
|
||||
final ClientResponse response = buildClientErrorResponse(error, errorMessageInResponse);
|
||||
networkRequest.handler().onComplete(response);
|
||||
verify(heartbeatState).reset();
|
||||
ArgumentCaptor<ErrorEvent> errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
|
||||
verify(backgroundEventHandler).add(errorEvent.capture());
|
||||
if (error == Errors.UNSUPPORTED_VERSION) {
|
||||
final String errorMessage = "The cluster does not support the STREAMS group " +
|
||||
"protocol or does not support the versions of the STREAMS group protocol used by this client " +
|
||||
"(used versions: " + StreamsGroupHeartbeatRequestData.LOWEST_SUPPORTED_VERSION + " to " +
|
||||
StreamsGroupHeartbeatRequestData.HIGHEST_SUPPORTED_VERSION + ").";
|
||||
assertTrue(logAppender.getMessages("ERROR").stream()
|
||||
.anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed due to " +
|
||||
error + ": " + errorMessage)));
|
||||
assertEquals(errorMessage, errorEvent.getValue().error().getMessage());
|
||||
} else {
|
||||
assertTrue(logAppender.getMessages("ERROR").stream()
|
||||
.anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed due to " +
|
||||
error + ": " + errorMessageInResponse)));
|
||||
assertEquals(errorMessageInResponse, errorEvent.getValue().error().getMessage());
|
||||
}
|
||||
assertInstanceOf(error.exception().getClass(), errorEvent.getValue().error());
|
||||
verify(membershipManager).transitionToFatal();
|
||||
verify(membershipManager).onFatalHeartbeatFailure();
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(
|
||||
value = Errors.class,
|
||||
names = {"FENCED_MEMBER_EPOCH", "UNKNOWN_MEMBER_ID"}
|
||||
)
|
||||
public void testFencedMemberOrUnknownMemberIdErrorResponse(final Errors error) {
|
||||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
|
||||
final HeartbeatRequestState heartbeatRequestState = heartbeatRequestStateMockedConstruction.constructed().get(0);
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
|
||||
|
||||
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
|
||||
assertEquals(1, result.unsentRequests.size());
|
||||
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
|
||||
final String errorMessage = "message";
|
||||
final ClientResponse response = buildClientErrorResponse(error, errorMessage);
|
||||
networkRequest.handler().onComplete(response);
|
||||
verify(heartbeatState).reset();
|
||||
verify(heartbeatRequestState).reset();
|
||||
verify(membershipManager).onFenced();
|
||||
verify(membershipManager).onFatalHeartbeatFailure();
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("provideOtherErrors")
|
||||
public void testOtherErrorResponse(final Errors error) {
|
||||
try (
|
||||
final MockedConstruction<HeartbeatRequestState> heartbeatRequestStateMockedConstruction = mockConstruction(
|
||||
HeartbeatRequestState.class,
|
||||
(mock, context) -> {
|
||||
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
|
||||
});
|
||||
final MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> heartbeatStateMockedConstruction = mockConstruction(
|
||||
StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
|
||||
final LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
|
||||
) {
|
||||
final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager();
|
||||
final StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
|
||||
|
||||
final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
|
||||
|
||||
assertEquals(1, result.unsentRequests.size());
|
||||
final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0);
|
||||
final String errorMessage = "message";
|
||||
final ClientResponse response = buildClientErrorResponse(error, errorMessage);
|
||||
networkRequest.handler().onComplete(response);
|
||||
assertTrue(logAppender.getMessages("ERROR").stream()
|
||||
.anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed due to unexpected error")));
|
||||
verify(heartbeatState).reset();
|
||||
ArgumentCaptor<ErrorEvent> errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
|
||||
verify(backgroundEventHandler).add(errorEvent.capture());
|
||||
assertEquals(errorMessage, errorEvent.getValue().error().getMessage());
|
||||
assertInstanceOf(error.exception().getClass(), errorEvent.getValue().error());
|
||||
verify(membershipManager).transitionToFatal();
|
||||
verify(membershipManager).onFatalHeartbeatFailure();
|
||||
}
|
||||
}
|
||||
|
||||
private static Stream<Arguments> provideOtherErrors() {
|
||||
final Set<Errors> consideredErrors = Set.of(
|
||||
Errors.NONE,
|
||||
Errors.NOT_COORDINATOR,
|
||||
Errors.COORDINATOR_NOT_AVAILABLE,
|
||||
Errors.COORDINATOR_LOAD_IN_PROGRESS,
|
||||
Errors.GROUP_AUTHORIZATION_FAILED,
|
||||
Errors.TOPIC_AUTHORIZATION_FAILED,
|
||||
Errors.INVALID_REQUEST,
|
||||
Errors.GROUP_MAX_SIZE_REACHED,
|
||||
Errors.FENCED_MEMBER_EPOCH,
|
||||
Errors.UNKNOWN_MEMBER_ID,
|
||||
Errors.UNSUPPORTED_VERSION,
|
||||
Errors.STREAMS_INVALID_TOPOLOGY,
|
||||
Errors.STREAMS_INVALID_TOPOLOGY_EPOCH,
|
||||
Errors.STREAMS_TOPOLOGY_FENCED);
|
||||
return Arrays.stream(Errors.values())
|
||||
.filter(error -> !consideredErrors.contains(error))
|
||||
.map(Arguments::of);
|
||||
}
|
||||
|
||||
private static ConsumerConfig config() {
|
||||
Properties prop = new Properties();
|
||||
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
|
|
@ -1014,6 +1421,24 @@ class StreamsGroupHeartbeatRequestManagerTest {
|
|||
);
|
||||
}
|
||||
|
||||
private ClientResponse buildClientErrorResponse(final Errors error, final String errorMessage) {
|
||||
return new ClientResponse(
|
||||
new RequestHeader(ApiKeys.STREAMS_GROUP_HEARTBEAT, (short) 1, "", 1),
|
||||
null,
|
||||
"-1",
|
||||
time.milliseconds(),
|
||||
time.milliseconds(),
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
new StreamsGroupHeartbeatResponse(
|
||||
new StreamsGroupHeartbeatResponseData()
|
||||
.setErrorCode(error.code())
|
||||
.setErrorMessage(errorMessage)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private static void assertTaskIdsEquals(final List<StreamsGroupHeartbeatRequestData.TaskIds> expected,
|
||||
final List<StreamsGroupHeartbeatRequestData.TaskIds> actual) {
|
||||
List<StreamsGroupHeartbeatRequestData.TaskIds> sortedExpected = expected.stream()
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ import org.apache.kafka.common.utils.Time;
|
|||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
|
@ -1471,22 +1473,14 @@ public class StreamsMembershipManagerTest {
|
|||
membershipManager.onHeartbeatRequestGenerated();
|
||||
assertFalse(groupLeft.isDone());
|
||||
|
||||
membershipManager.onHeartbeatFailure(true);
|
||||
membershipManager.onRetriableHeartbeatFailure();
|
||||
|
||||
assertTrue(groupLeft.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnHeartbeatFatalFailure() {
|
||||
testOnHeartbeatFailure(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnHeartbeatRetriableFailure() {
|
||||
testOnHeartbeatFailure(true);
|
||||
}
|
||||
|
||||
private void testOnHeartbeatFailure(boolean retriable) {
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testOnHeartbeatFailure(boolean retriable) {
|
||||
final MetricName failedRebalanceTotalMetricName = metrics.metricName(
|
||||
"failed-rebalance-total",
|
||||
CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX
|
||||
|
|
@ -1503,7 +1497,11 @@ public class StreamsMembershipManagerTest {
|
|||
final double failedRebalancesTotalBefore = (double) metrics.metric(failedRebalanceTotalMetricName).metricValue();
|
||||
assertEquals(0L, failedRebalancesTotalBefore);
|
||||
|
||||
membershipManager.onHeartbeatFailure(retriable);
|
||||
if (retriable) {
|
||||
membershipManager.onRetriableHeartbeatFailure();
|
||||
} else {
|
||||
membershipManager.onFatalHeartbeatFailure();
|
||||
}
|
||||
|
||||
final double failedRebalancesTotalAfter = (double) metrics.metric(failedRebalanceTotalMetricName).metricValue();
|
||||
assertEquals(retriable ? 0L : 1L, failedRebalancesTotalAfter);
|
||||
|
|
|
|||
Loading…
Reference in New Issue