From 86baac103b7d3910952307006059555eb173fdd5 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Tue, 13 May 2025 10:42:40 +0100 Subject: [PATCH] MINOR: Improve client error messages for share groups not enabled (#19688) As mentioned in https://github.com/apache/kafka/pull/19378#pullrequestreview-2775598123, the error messages for a 4.1 share consumer could be clearer for the different cases for when it cannot successfully join a share group. This PR uses different error messages for the different cases such as out-of-date cluster or share groups just not enabled. Reviewers: Apoorv Mittal --- .../internals/ShareHeartbeatRequestManager.java | 7 +++++-- .../internals/ShareHeartbeatRequestManagerTest.java | 11 ++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java index 21d598afb4c..f46b6f72c87 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java @@ -54,6 +54,9 @@ public class ShareHeartbeatRequestManager extends AbstractHeartbeatRequestManage public static final String SHARE_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the share group protocol. " + "To use share groups, the cluster must have the share group protocol enabled."; + public static final String SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG = "The cluster does not support the share group protocol " + + "using ShareGroupHeartbeat API version 1 or later. This version of the API was introduced in Apache Kafka v4.1."; + public ShareHeartbeatRequestManager( final LogContext logContext, final Time time, @@ -93,8 +96,8 @@ public class ShareHeartbeatRequestManager extends AbstractHeartbeatRequestManage public boolean handleSpecificFailure(Throwable exception) { boolean errorHandled = false; if (exception instanceof UnsupportedVersionException) { - logger.error("{} failed due to {}: {}", heartbeatRequestName(), exception.getMessage(), SHARE_PROTOCOL_NOT_SUPPORTED_MSG); - handleFatalFailure(new UnsupportedVersionException(SHARE_PROTOCOL_NOT_SUPPORTED_MSG, exception)); + logger.error("{} failed due to {}: {}", heartbeatRequestName(), exception.getMessage(), SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG); + handleFatalFailure(new UnsupportedVersionException(SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG, exception)); errorHandled = true; } return errorHandled; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java index 856cdd29493..8952271b250 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java @@ -60,6 +60,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_NOT_SUPPORTED_MSG; +import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -439,7 +440,7 @@ public class ShareHeartbeatRequestManagerTest { } @ParameterizedTest - @ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG}) + @ValueSource(strings = {SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG}) public void testUnsupportedVersionGeneratedOnTheClient(String errorMsg) { mockResponseWithException(new UnsupportedVersionException(errorMsg), false); @@ -692,11 +693,11 @@ public class ShareHeartbeatRequestManagerTest { private ClientResponse createHeartbeatResponseWithException( final NetworkClientDelegate.UnsentRequest request, final UnsupportedVersionException exception, - final boolean isFromClient + final boolean isFromBroker ) { ShareGroupHeartbeatResponse response = null; - if (!isFromClient) { - response = new ShareGroupHeartbeatResponse(null); + if (isFromBroker) { + response = new ShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code())); } return new ClientResponse( new RequestHeader(ApiKeys.SHARE_GROUP_HEARTBEAT, ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion(), "client-id", 1), @@ -705,7 +706,7 @@ public class ShareHeartbeatRequestManagerTest { time.milliseconds(), time.milliseconds(), false, - exception, + isFromBroker ? null : exception, null, response); }