KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB (#18101)

Add specific error handling for unsupported version in share consumer and consumer

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Peter Lee 2024-12-28 23:39:58 +08:00 committed by GitHub
parent bc7a1a8969
commit be4d1a6277
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 184 additions and 40 deletions

View File

@ -25,8 +25,6 @@ import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.LogContext;
@ -317,31 +315,14 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
heartbeatRequestState.remainingBackoffMs(responseTimeMs),
exception.getMessage());
logger.debug(message);
} else {
} else if (!handleSpecificFailure(exception)) {
logger.error("{} failed due to fatal error: {}", heartbeatRequestName(), exception.getMessage());
if (isHBApiUnsupportedErrorMsg(exception)) {
// This is expected to be the case where building the request fails because the node does not support
// the API. Propagate custom message.
handleFatalFailure(new UnsupportedVersionException(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, exception));
} else {
// This is the case where building the request fails even though the node supports the API (ex.
// required version 1 not available when regex in use).
handleFatalFailure(exception);
}
handleFatalFailure(exception);
}
// Notify the group manager about the failure after all errors have been handled and propagated.
membershipManager().onHeartbeatFailure(exception instanceof RetriableException);
}
/***
* @return True if the exception is the UnsupportedVersion generated on the client, before sending the request,
* when checking if the API is available on the broker.
*/
private boolean isHBApiUnsupportedErrorMsg(Throwable exception) {
return exception instanceof UnsupportedVersionException &&
exception.getMessage().equals("The node does not support " + ApiKeys.CONSUMER_GROUP_HEARTBEAT);
}
private void onResponse(final R response, final long currentTimeMs) {
if (errorForResponse(response) == Errors.NONE) {
heartbeatRequestState.updateHeartbeatIntervalMs(heartbeatIntervalForResponse(response));
@ -404,14 +385,6 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
handleFatalFailure(error.exception(errorMessage));
break;
case UNSUPPORTED_VERSION:
// Broker responded with HB not supported, meaning the new protocol is not enabled, so propagate
// custom message for it. Note that the case where the protocol is not supported at all should fail
// on the client side when building the request and checking supporting APIs (handled on onFailure).
logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, errorMessage);
handleFatalFailure(error.exception(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG));
break;
case FENCED_MEMBER_EPOCH:
message = String.format("%s failed for member %s because epoch %s is fenced.",
heartbeatRequestName(), membershipManager().memberId(), membershipManager().memberEpoch());
@ -437,7 +410,7 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
break;
default:
if (!handleSpecificError(response, currentTimeMs)) {
if (!handleSpecificExceptionInResponse(response, currentTimeMs)) {
// If the manager receives an unknown error - there could be a bug in the code or a new error code
logger.error("{} failed due to unexpected error {}: {}", heartbeatRequestName(), error, errorMessage);
handleFatalFailure(error.exception(errorMessage));
@ -461,15 +434,25 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
membershipManager().transitionToFatal();
}
/**
* Error handling specific failure to a group type when sending the request
* and no response has been received.
*
* @param exception The exception thrown building the request
* @return true if the error was handled, else false
*/
public boolean handleSpecificFailure(Throwable exception) {
return false;
}
/**
* Error handling specific to a group type.
* Error handling specific response exception to a group type.
*
* @param response The heartbeat response
* @param currentTimeMs Current time
* @return true if the error was handled, else false
*/
public boolean handleSpecificError(final R response, final long currentTimeMs) {
public boolean handleSpecificExceptionInResponse(final R response, final long currentTimeMs) {
return false;
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@ -38,6 +39,8 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
/**
* This is the heartbeat request manager for consumer groups.
*
@ -91,12 +94,43 @@ public class ConsumerHeartbeatRequestManager extends AbstractHeartbeatRequestMan
* {@inheritDoc}
*/
@Override
public boolean handleSpecificError(final ConsumerGroupHeartbeatResponse response, final long currentTimeMs) {
public boolean handleSpecificFailure(Throwable exception) {
boolean errorHandled = false;
String errorMessage = exception.getMessage();
if (exception instanceof UnsupportedVersionException) {
String message = CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG;
if (errorMessage.equals(REGEX_RESOLUTION_NOT_SUPPORTED_MSG)) {
message = REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
logger.error("{} regex resolution not supported: {}", heartbeatRequestName(), message);
} else {
logger.error("{} failed due to unsupported version while sending request: {}", heartbeatRequestName(), errorMessage);
}
handleFatalFailure(new UnsupportedVersionException(message, exception));
errorHandled = true;
}
return errorHandled;
}
/**
* {@inheritDoc}
*/
@Override
public boolean handleSpecificExceptionInResponse(final ConsumerGroupHeartbeatResponse response, final long currentTimeMs) {
Errors error = errorForResponse(response);
String errorMessage = errorMessageForResponse(response);
boolean errorHandled;
switch (error) {
// Broker responded with HB not supported, meaning the new protocol is not enabled, so propagate
// custom message for it. Note that the case where the protocol is not supported at all should fail
// on the client side when building the request and checking supporting APIs (handled on onFailure).
case UNSUPPORTED_VERSION:
logger.error("{} failed due to unsupported version response on broker side: {}",
heartbeatRequestName(), CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG);
handleFatalFailure(error.exception(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG));
errorHandled = true;
break;
case UNRELEASED_INSTANCE_ID:
logger.error("{} failed due to unreleased instance id {}: {}",
heartbeatRequestName(), membershipManager.groupInstanceId().orElse("null"), errorMessage);

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@ -50,6 +51,9 @@ public class ShareHeartbeatRequestManager extends AbstractHeartbeatRequestManage
*/
private final HeartbeatState heartbeatState;
public static final String SHARE_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the share group protocol. " +
"To use share groups, the cluster must have the share group protocol enabled.";
public ShareHeartbeatRequestManager(
final LogContext logContext,
final Time time,
@ -82,6 +86,45 @@ public class ShareHeartbeatRequestManager extends AbstractHeartbeatRequestManage
this.heartbeatState = heartbeatState;
}
/**
* {@inheritDoc}
*/
@Override
public boolean handleSpecificFailure(Throwable exception) {
boolean errorHandled = false;
if (exception instanceof UnsupportedVersionException) {
logger.error("{} failed due to {}: {}", heartbeatRequestName(), exception.getMessage(), SHARE_PROTOCOL_NOT_SUPPORTED_MSG);
handleFatalFailure(new UnsupportedVersionException(SHARE_PROTOCOL_NOT_SUPPORTED_MSG, exception));
errorHandled = true;
}
return errorHandled;
}
/**
* {@inheritDoc}
*/
@Override
public boolean handleSpecificExceptionInResponse(final ShareGroupHeartbeatResponse response, final long currentTimeMs) {
Errors error = errorForResponse(response);
boolean errorHandled;
switch (error) {
// Broker responded with HB not supported, meaning the new protocol is not enabled, so propagate
// custom message for it. Note that the case where the protocol is not supported at all should fail
// on the client side when building the request and checking supporting APIs (handled on onFailure).
case UNSUPPORTED_VERSION:
logger.error("{} failed due to unsupported version: {}",
heartbeatRequestName(), SHARE_PROTOCOL_NOT_SUPPORTED_MSG);
handleFatalFailure(error.exception(SHARE_PROTOCOL_NOT_SUPPORTED_MSG));
errorHandled = true;
break;
default:
errorHandled = false;
}
return errorHandled;
}
/**
* {@inheritDoc}
*/

View File

@ -611,9 +611,25 @@ public class ConsumerHeartbeatRequestManagerTest {
* 2. Required HB API version is not available.
*/
@ParameterizedTest
@ValueSource(strings = {CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG})
public void testUnsupportedVersionFromBroker(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg), true);
ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error());
assertEquals(errorMsg, errorEvent.error().getMessage());
clearInvocations(backgroundEventHandler);
}
/**
* This validates the UnsupportedApiVersion the client generates while building a HB if:
* REGEX_RESOLUTION_NOT_SUPPORTED_MSG only generated on the client side.
*/
@ParameterizedTest
@ValueSource(strings = {CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, REGEX_RESOLUTION_NOT_SUPPORTED_MSG})
public void testUnsupportedVersion(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg));
public void testUnsupportedVersionFromClient(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg), false);
ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
@ -633,14 +649,14 @@ public class ConsumerHeartbeatRequestManagerTest {
result.unsentRequests.get(0).handler().onComplete(response);
}
private void mockResponseWithException(UnsupportedVersionException exception) {
private void mockResponseWithException(UnsupportedVersionException exception, boolean isFromBroker) {
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());
when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
ClientResponse response = createHeartbeatResponseWithException(
result.unsentRequests.get(0), exception);
result.unsentRequests.get(0), exception, isFromBroker);
result.unsentRequests.get(0).handler().onComplete(response);
}
@ -1044,9 +1060,13 @@ public class ConsumerHeartbeatRequestManagerTest {
private ClientResponse createHeartbeatResponseWithException(
final NetworkClientDelegate.UnsentRequest request,
final UnsupportedVersionException exception
final UnsupportedVersionException exception,
final boolean isFromBroker
) {
ConsumerGroupHeartbeatResponse response = new ConsumerGroupHeartbeatResponse(null);
ConsumerGroupHeartbeatResponse response = null;
if (isFromBroker) {
response = new ConsumerGroupHeartbeatResponse(null);
}
return new ClientResponse(
new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
request.handler(),

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
@ -58,6 +59,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_NOT_SUPPORTED_MSG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@ -67,6 +69,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@ -363,7 +366,7 @@ public class ShareHeartbeatRequestManagerTest {
@ParameterizedTest
@MethodSource("errorProvider")
public void testHeartbeatResponseOnErrorHandling(final Errors error, final boolean isFatal) {
// Handling errors on the second heartbeat
// Handling errors on the second heartbeat
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());
@ -422,6 +425,46 @@ public class ShareHeartbeatRequestManagerTest {
}
}
@ParameterizedTest
@ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG})
public void testUnsupportedVersionGeneratedOnTheBroker(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg), true);
ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error());
assertEquals(errorMsg, errorEvent.error().getMessage());
clearInvocations(backgroundEventHandler);
}
@ParameterizedTest
@ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG})
public void testUnsupportedVersionGeneratedOnTheClient(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg), false);
ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error());
assertEquals(errorMsg, errorEvent.error().getMessage());
clearInvocations(backgroundEventHandler);
}
private void mockResponseWithException(UnsupportedVersionException exception, boolean isFromBroker) {
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());
// Manually completing the response to test error handling
when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
ClientResponse response = createHeartbeatResponseWithException(
result.unsentRequests.get(0),
exception,
isFromBroker);
result.unsentRequests.get(0).handler().onComplete(response);
}
@Test
public void testHeartbeatState() {
mockJoiningMemberData();
@ -646,6 +689,27 @@ public class ShareHeartbeatRequestManagerTest {
response);
}
private ClientResponse createHeartbeatResponseWithException(
final NetworkClientDelegate.UnsentRequest request,
final UnsupportedVersionException exception,
final boolean isFromClient
) {
ShareGroupHeartbeatResponse response = null;
if (!isFromClient) {
response = new ShareGroupHeartbeatResponse(null);
}
return new ClientResponse(
new RequestHeader(ApiKeys.SHARE_GROUP_HEARTBEAT, ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
request.handler(),
"0",
time.milliseconds(),
time.milliseconds(),
false,
exception,
null,
response);
}
private ConsumerConfig config() {
Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);