MINOR: Improve client error messages for share groups not enabled (#19688)
CI / build (push) Waiting to run Details

As mentioned in
https://github.com/apache/kafka/pull/19378#pullrequestreview-2775598123,
the error messages for a 4.1 share consumer could be clearer for the
different cases for when it cannot successfully join a share group.

This PR uses different error messages for the different cases such as
out-of-date cluster or share groups just not enabled.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Andrew Schofield 2025-05-13 10:42:40 +01:00 committed by GitHub
parent 6eafe407bd
commit 86baac103b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 11 additions and 7 deletions

View File

@ -54,6 +54,9 @@ public class ShareHeartbeatRequestManager extends AbstractHeartbeatRequestManage
public static final String SHARE_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the share group protocol. " + public static final String SHARE_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the share group protocol. " +
"To use share groups, the cluster must have the share group protocol enabled."; "To use share groups, the cluster must have the share group protocol enabled.";
public static final String SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG = "The cluster does not support the share group protocol " +
"using ShareGroupHeartbeat API version 1 or later. This version of the API was introduced in Apache Kafka v4.1.";
public ShareHeartbeatRequestManager( public ShareHeartbeatRequestManager(
final LogContext logContext, final LogContext logContext,
final Time time, final Time time,
@ -93,8 +96,8 @@ public class ShareHeartbeatRequestManager extends AbstractHeartbeatRequestManage
public boolean handleSpecificFailure(Throwable exception) { public boolean handleSpecificFailure(Throwable exception) {
boolean errorHandled = false; boolean errorHandled = false;
if (exception instanceof UnsupportedVersionException) { if (exception instanceof UnsupportedVersionException) {
logger.error("{} failed due to {}: {}", heartbeatRequestName(), exception.getMessage(), SHARE_PROTOCOL_NOT_SUPPORTED_MSG); logger.error("{} failed due to {}: {}", heartbeatRequestName(), exception.getMessage(), SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG);
handleFatalFailure(new UnsupportedVersionException(SHARE_PROTOCOL_NOT_SUPPORTED_MSG, exception)); handleFatalFailure(new UnsupportedVersionException(SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG, exception));
errorHandled = true; errorHandled = true;
} }
return errorHandled; return errorHandled;

View File

@ -60,6 +60,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_NOT_SUPPORTED_MSG; import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_NOT_SUPPORTED_MSG;
import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@ -439,7 +440,7 @@ public class ShareHeartbeatRequestManagerTest {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG}) @ValueSource(strings = {SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG})
public void testUnsupportedVersionGeneratedOnTheClient(String errorMsg) { public void testUnsupportedVersionGeneratedOnTheClient(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg), false); mockResponseWithException(new UnsupportedVersionException(errorMsg), false);
@ -692,11 +693,11 @@ public class ShareHeartbeatRequestManagerTest {
private ClientResponse createHeartbeatResponseWithException( private ClientResponse createHeartbeatResponseWithException(
final NetworkClientDelegate.UnsentRequest request, final NetworkClientDelegate.UnsentRequest request,
final UnsupportedVersionException exception, final UnsupportedVersionException exception,
final boolean isFromClient final boolean isFromBroker
) { ) {
ShareGroupHeartbeatResponse response = null; ShareGroupHeartbeatResponse response = null;
if (!isFromClient) { if (isFromBroker) {
response = new ShareGroupHeartbeatResponse(null); response = new ShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()));
} }
return new ClientResponse( return new ClientResponse(
new RequestHeader(ApiKeys.SHARE_GROUP_HEARTBEAT, ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion(), "client-id", 1), new RequestHeader(ApiKeys.SHARE_GROUP_HEARTBEAT, ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
@ -705,7 +706,7 @@ public class ShareHeartbeatRequestManagerTest {
time.milliseconds(), time.milliseconds(),
time.milliseconds(), time.milliseconds(),
false, false,
exception, isFromBroker ? null : exception,
null, null,
response); response);
} }