diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java index 21d598afb4c..f46b6f72c87 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java @@ -54,6 +54,9 @@ public class ShareHeartbeatRequestManager extends AbstractHeartbeatRequestManage public static final String SHARE_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the share group protocol. " + "To use share groups, the cluster must have the share group protocol enabled."; + public static final String SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG = "The cluster does not support the share group protocol " + + "using ShareGroupHeartbeat API version 1 or later. This version of the API was introduced in Apache Kafka v4.1."; + public ShareHeartbeatRequestManager( final LogContext logContext, final Time time, @@ -93,8 +96,8 @@ public class ShareHeartbeatRequestManager extends AbstractHeartbeatRequestManage public boolean handleSpecificFailure(Throwable exception) { boolean errorHandled = false; if (exception instanceof UnsupportedVersionException) { - logger.error("{} failed due to {}: {}", heartbeatRequestName(), exception.getMessage(), SHARE_PROTOCOL_NOT_SUPPORTED_MSG); - handleFatalFailure(new UnsupportedVersionException(SHARE_PROTOCOL_NOT_SUPPORTED_MSG, exception)); + logger.error("{} failed due to {}: {}", heartbeatRequestName(), exception.getMessage(), SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG); + handleFatalFailure(new UnsupportedVersionException(SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG, exception)); errorHandled = true; } return errorHandled; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java index 856cdd29493..8952271b250 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java @@ -60,6 +60,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_NOT_SUPPORTED_MSG; +import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -439,7 +440,7 @@ public class ShareHeartbeatRequestManagerTest { } @ParameterizedTest - @ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG}) + @ValueSource(strings = {SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG}) public void testUnsupportedVersionGeneratedOnTheClient(String errorMsg) { mockResponseWithException(new UnsupportedVersionException(errorMsg), false); @@ -692,11 +693,11 @@ public class ShareHeartbeatRequestManagerTest { private ClientResponse createHeartbeatResponseWithException( final NetworkClientDelegate.UnsentRequest request, final UnsupportedVersionException exception, - final boolean isFromClient + final boolean isFromBroker ) { ShareGroupHeartbeatResponse response = null; - if (!isFromClient) { - response = new ShareGroupHeartbeatResponse(null); + if (isFromBroker) { + response = new ShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code())); } return new ClientResponse( new RequestHeader(ApiKeys.SHARE_GROUP_HEARTBEAT, ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion(), "client-id", 1), @@ -705,7 +706,7 @@ public class ShareHeartbeatRequestManagerTest { time.milliseconds(), time.milliseconds(), false, - exception, + isFromBroker ? null : exception, null, response); }