diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index b260e5e5fbf..1f8ddc725b5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -25,8 +25,6 @@ import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.RetriableException; -import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.utils.LogContext; @@ -317,31 +315,14 @@ public abstract class AbstractHeartbeatRequestManager errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); + verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); + ErrorEvent errorEvent = errorEventArgumentCaptor.getValue(); + assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error()); + assertEquals(errorMsg, errorEvent.error().getMessage()); + clearInvocations(backgroundEventHandler); + } + + /** + * This validates the UnsupportedApiVersion the client generates while building a HB if: + * REGEX_RESOLUTION_NOT_SUPPORTED_MSG only generated on the client side. + */ + @ParameterizedTest @ValueSource(strings = {CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, REGEX_RESOLUTION_NOT_SUPPORTED_MSG}) - public void testUnsupportedVersion(String errorMsg) { - mockResponseWithException(new UnsupportedVersionException(errorMsg)); + public void testUnsupportedVersionFromClient(String errorMsg) { + mockResponseWithException(new UnsupportedVersionException(errorMsg), false); ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); ErrorEvent errorEvent = errorEventArgumentCaptor.getValue(); @@ -633,14 +649,14 @@ public class ConsumerHeartbeatRequestManagerTest { result.unsentRequests.get(0).handler().onComplete(response); } - private void mockResponseWithException(UnsupportedVersionException exception) { + private void mockResponseWithException(UnsupportedVersionException exception, boolean isFromBroker) { time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); ClientResponse response = createHeartbeatResponseWithException( - result.unsentRequests.get(0), exception); + result.unsentRequests.get(0), exception, isFromBroker); result.unsentRequests.get(0).handler().onComplete(response); } @@ -1044,9 +1060,13 @@ public class ConsumerHeartbeatRequestManagerTest { private ClientResponse createHeartbeatResponseWithException( final NetworkClientDelegate.UnsentRequest request, - final UnsupportedVersionException exception + final UnsupportedVersionException exception, + final boolean isFromBroker ) { - ConsumerGroupHeartbeatResponse response = new ConsumerGroupHeartbeatResponse(null); + ConsumerGroupHeartbeatResponse response = null; + if (isFromBroker) { + response = new ConsumerGroupHeartbeatResponse(null); + } return new ClientResponse( new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1), request.handler(), diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java index f9e46571795..430cc3ae84f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.metrics.KafkaMetric; @@ -58,6 +59,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_NOT_SUPPORTED_MSG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -67,6 +69,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -363,7 +366,7 @@ public class ShareHeartbeatRequestManagerTest { @ParameterizedTest @MethodSource("errorProvider") public void testHeartbeatResponseOnErrorHandling(final Errors error, final boolean isFatal) { - // Handling errors on the second heartbeat + // Handling errors on the second heartbeat time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); @@ -422,6 +425,46 @@ public class ShareHeartbeatRequestManagerTest { } } + @ParameterizedTest + @ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG}) + public void testUnsupportedVersionGeneratedOnTheBroker(String errorMsg) { + mockResponseWithException(new UnsupportedVersionException(errorMsg), true); + + ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); + verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); + ErrorEvent errorEvent = errorEventArgumentCaptor.getValue(); + assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error()); + assertEquals(errorMsg, errorEvent.error().getMessage()); + clearInvocations(backgroundEventHandler); + } + + @ParameterizedTest + @ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG}) + public void testUnsupportedVersionGeneratedOnTheClient(String errorMsg) { + mockResponseWithException(new UnsupportedVersionException(errorMsg), false); + + ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); + verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); + ErrorEvent errorEvent = errorEventArgumentCaptor.getValue(); + assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error()); + assertEquals(errorMsg, errorEvent.error().getMessage()); + clearInvocations(backgroundEventHandler); + } + + private void mockResponseWithException(UnsupportedVersionException exception, boolean isFromBroker) { + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + // Manually completing the response to test error handling + when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); + ClientResponse response = createHeartbeatResponseWithException( + result.unsentRequests.get(0), + exception, + isFromBroker); + result.unsentRequests.get(0).handler().onComplete(response); + } + @Test public void testHeartbeatState() { mockJoiningMemberData(); @@ -646,6 +689,27 @@ public class ShareHeartbeatRequestManagerTest { response); } + private ClientResponse createHeartbeatResponseWithException( + final NetworkClientDelegate.UnsentRequest request, + final UnsupportedVersionException exception, + final boolean isFromClient + ) { + ShareGroupHeartbeatResponse response = null; + if (!isFromClient) { + response = new ShareGroupHeartbeatResponse(null); + } + return new ClientResponse( + new RequestHeader(ApiKeys.SHARE_GROUP_HEARTBEAT, ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion(), "client-id", 1), + request.handler(), + "0", + time.milliseconds(), + time.milliseconds(), + false, + exception, + null, + response); + } + private ConsumerConfig config() { Properties prop = new Properties(); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);