diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 1b12ae3d4b4..619659f3274 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -873,7 +873,8 @@ public class KafkaConsumer implements Consumer { * is too large or if the topic does not exist). * @throws org.apache.kafka.common.errors.TimeoutException if the timeout specified by {@code default.api.timeout.ms} expires * before successful completion of the offset commit - * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker. + * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol + * and this instance gets fenced by broker. */ @Override public void commitSync() { @@ -916,7 +917,8 @@ public class KafkaConsumer implements Consumer { * is too large or if the topic does not exist). * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion * of the offset commit - * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker. + * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol + * and this instance gets fenced by broker. */ @Override public void commitSync(Duration timeout) { @@ -964,7 +966,8 @@ public class KafkaConsumer implements Consumer { * is too large or if the topic does not exist). * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion * of the offset commit - * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker. + * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol + * and this instance gets fenced by broker. */ @Override public void commitSync(final Map offsets) { @@ -1012,7 +1015,8 @@ public class KafkaConsumer implements Consumer { * is too large or if the topic does not exist). * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion * of the offset commit - * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker. + * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol + * and this instance gets fenced by broker. */ @Override public void commitSync(final Map offsets, final Duration timeout) { @@ -1022,7 +1026,8 @@ public class KafkaConsumer implements Consumer { /** * Commit offsets returned on the last {@link #poll(Duration)} for all the subscribed list of topics and partition. * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)} - * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker. + * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol + * and this instance gets fenced by broker. */ @Override public void commitAsync() { @@ -1045,7 +1050,8 @@ public class KafkaConsumer implements Consumer { * (and variants) returns. * * @param callback Callback to invoke when the commit completes - * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker. + * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol + * and this instance gets fenced by broker. */ @Override public void commitAsync(OffsetCommitCallback callback) { @@ -1072,7 +1078,8 @@ public class KafkaConsumer implements Consumer { * @param offsets A map of offsets by partition with associate metadata. This map will be copied internally, so it * is safe to mutate the map after returning. * @param callback Callback to invoke when the commit completes - * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker. + * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol + * and this instance gets fenced by broker. */ @Override public void commitAsync(final Map offsets, OffsetCommitCallback callback) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index fccd69c86b8..f4787580361 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -75,7 +75,6 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.AuthenticationException; -import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.InvalidTopicException; @@ -117,7 +116,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -249,7 +247,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private boolean cachedSubscriptionHasAllFetchPositions; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; - private final AtomicBoolean asyncCommitFenced; // Last triggered async commit future. Used to wait until all previous async commits are completed. // We only need to keep track of the last one, since they are guaranteed to complete in order. private CompletableFuture lastPendingAsyncCommit = null; @@ -336,7 +333,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), backgroundEventHandler); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); - this.asyncCommitFenced = new AtomicBoolean(false); this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig)); final Supplier requestManagersSupplier = RequestManagers.supplier(time, logContext, @@ -448,7 +444,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.clientTelemetryReporter = Optional.empty(); this.autoCommitEnabled = autoCommitEnabled; this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); - this.asyncCommitFenced = new AtomicBoolean(false); } AsyncKafkaConsumer(LogContext logContext, @@ -511,7 +506,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { backgroundEventHandler ); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); - this.asyncCommitFenced = new AtomicBoolean(false); Supplier requestManagersSupplier = RequestManagers.supplier( time, logContext, @@ -766,10 +760,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets); } - if (t instanceof FencedInstanceIdException) { - asyncCommitFenced.set(true); - } - if (callback == null) { if (t != null) { log.error("Offset commit with offsets {} failed", offsets, t); @@ -786,7 +776,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private CompletableFuture commit(final CommitEvent commitEvent) { maybeThrowInvalidGroupIdException(); - maybeThrowFencedInstanceException(); offsetCommitCallbackInvoker.executeCallbacks(); Map offsets = commitEvent.offsets(); @@ -1657,7 +1646,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { @Override public boolean updateAssignmentMetadataIfNeeded(Timer timer) { - maybeThrowFencedInstanceException(); offsetCommitCallbackInvoker.executeCallbacks(); try { applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer))); @@ -1940,20 +1928,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { return kafkaConsumerMetrics; } - private void maybeThrowFencedInstanceException() { - if (asyncCommitFenced.get()) { - String groupInstanceId = "unknown"; - if (!groupMetadata.get().isPresent()) { - log.error("No group metadata found although a group ID was provided. This is a bug!"); - } else if (!groupMetadata.get().get().groupInstanceId().isPresent()) { - log.error("No group instance ID found although the consumer is fenced. This is a bug!"); - } else { - groupInstanceId = groupMetadata.get().get().groupInstanceId().get(); - } - throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + groupInstanceId); - } - } - // Visible for testing SubscriptionState subscriptions() { return subscriptions; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 5a9e0455d29..dfa2520a02b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -735,11 +735,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener coordinatorRequestManager.markCoordinatorUnknown(error.message(), currentTimeMs); future.completeExceptionally(error.exception()); return; - } else if (error == Errors.FENCED_INSTANCE_ID) { - String fencedError = "OffsetCommit failed due to group instance id fenced: " + groupInstanceId; - log.error(fencedError); - future.completeExceptionally(new CommitFailedException(fencedError)); - return; } else if (error == Errors.OFFSET_METADATA_TOO_LARGE || error == Errors.INVALID_COMMIT_OFFSET_SIZE) { future.completeExceptionally(error.exception()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index f9db6ba05da..380b7082b97 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -57,7 +57,6 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; @@ -334,23 +333,6 @@ public class AsyncKafkaConsumerTest { new GroupAuthorizationException("Group authorization exception")); } - @Test - public void testCommitAsyncWithFencedException() { - consumer = newConsumer(); - completeCommitSyncApplicationEventSuccessfully(); - final Map offsets = mockTopicPartitionOffset(); - MockCommitCallback callback = new MockCommitCallback(); - - assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); - - final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); - verify(applicationEventHandler).add(commitEventCaptor.capture()); - final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); - commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception()); - - assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> consumer.commitAsync()); - } - @Test public void testCommitted() { time = new MockTime(1); @@ -610,52 +592,6 @@ public class AsyncKafkaConsumerTest { } } - @Test - public void testCommitAsyncTriggersFencedExceptionFromCommitAsync() { - final String groupId = "consumerGroupA"; - final String groupInstanceId = "groupInstanceId1"; - final Properties props = requiredConsumerConfigAndGroupId(groupId); - props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); - final ConsumerConfig config = new ConsumerConfig(props); - consumer = newConsumer(config); - completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception()); - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); - final TopicPartition tp = new TopicPartition("foo", 0); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(tp)); - completeSeekUnvalidatedEventSuccessfully(); - consumer.seek(tp, 20); - - assertDoesNotThrow(() -> consumer.commitAsync()); - - Exception e = assertThrows(FencedInstanceIdException.class, () -> consumer.commitAsync()); - assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); - } - - @Test - public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { - final String groupId = "consumerGroupA"; - final String groupInstanceId = "groupInstanceId1"; - final Properties props = requiredConsumerConfigAndGroupId(groupId); - props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); - final ConsumerConfig config = new ConsumerConfig(props); - consumer = newConsumer(config); - completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception()); - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); - final TopicPartition tp = new TopicPartition("foo", 0); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(tp)); - completeSeekUnvalidatedEventSuccessfully(); - consumer.seek(tp, 20); - - assertDoesNotThrow(() -> consumer.commitAsync()); - - Exception e = assertThrows(FencedInstanceIdException.class, () -> consumer.commitSync()); - assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); - } - @Test public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { final TopicPartition tp = new TopicPartition("foo", 0); @@ -739,29 +675,6 @@ public class AsyncKafkaConsumerTest { return allValues.get(allValues.size() - 1); } - @Test - public void testPollTriggersFencedExceptionFromCommitAsync() { - final String groupId = "consumerGroupA"; - final String groupInstanceId = "groupInstanceId1"; - final Properties props = requiredConsumerConfigAndGroupId(groupId); - props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); - final ConsumerConfig config = new ConsumerConfig(props); - consumer = newConsumer(config); - completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception()); - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); - final TopicPartition tp = new TopicPartition("foo", 0); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(tp)); - completeSeekUnvalidatedEventSuccessfully(); - consumer.seek(tp, 20); - - assertDoesNotThrow(() -> consumer.commitAsync()); - - Exception e = assertThrows(FencedInstanceIdException.class, () -> consumer.poll(Duration.ZERO)); - assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); - } - @Test public void testEnsurePollExecutedCommitAsyncCallbacks() { consumer = newConsumer(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 1a805e642fc..9ded26b9e71 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -340,27 +340,8 @@ public class CommitRequestManagerTest { assertExceptionHandling(commitRequestManager, error, true); } - @ParameterizedTest - @MethodSource("commitSyncExpectedExceptions") - public void testCommitSyncFailsWithExpectedException(Errors commitError, - Class expectedException) { - CommitRequestManager commitRequestManager = create(false, 100); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); - - Map offsets = Collections.singletonMap( - new TopicPartition("topic", 1), - new OffsetAndMetadata(0)); - - // Send sync offset commit that fails and verify it propagates the expected exception. - long deadlineMs = time.milliseconds() + retryBackoffMs; - CompletableFuture commitResult = commitRequestManager.commitSync(offsets, deadlineMs); - completeOffsetCommitRequestWithError(commitRequestManager, commitError); - assertFutureThrows(commitResult, expectedException); - } - private static Stream commitSyncExpectedExceptions() { return Stream.of( - Arguments.of(Errors.FENCED_INSTANCE_ID, CommitFailedException.class), Arguments.of(Errors.UNKNOWN_MEMBER_ID, CommitFailedException.class), Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, Errors.OFFSET_METADATA_TOO_LARGE.exception().getClass()), Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE.exception().getClass()), @@ -985,10 +966,6 @@ public class CommitRequestManagerTest { case INVALID_COMMIT_OFFSET_SIZE: assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE); break; - case FENCED_INSTANCE_ID: - // This is a fatal failure, so we should not retry - assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE); - break; default: if (errors.exception() instanceof RetriableException && requestShouldBeRetried) { assertRetryBackOff(commitRequestManager, remainBackoffMs); @@ -1279,7 +1256,6 @@ public class CommitRequestManagerTest { Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION), Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE), Arguments.of(Errors.REQUEST_TIMED_OUT), - Arguments.of(Errors.FENCED_INSTANCE_ID), Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED), Arguments.of(Errors.STALE_MEMBER_EPOCH), Arguments.of(Errors.UNKNOWN_MEMBER_ID)); @@ -1299,7 +1275,6 @@ public class CommitRequestManagerTest { Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION), Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE), Arguments.of(Errors.REQUEST_TIMED_OUT), - Arguments.of(Errors.FENCED_INSTANCE_ID), Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED), Arguments.of(Errors.UNKNOWN_MEMBER_ID), // Adding STALE_MEMBER_EPOCH as non-retriable here because it is only retried if a new