KAFKA-16816: Remove unneeded FencedInstanceId support on commit path for new consumer (#17559)

Reviewers: Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
TaiJuWu 2024-11-05 21:33:23 +08:00 committed by GitHub
parent c91243a4b7
commit ee3cea05aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 14 additions and 150 deletions

View File

@ -873,7 +873,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout specified by {@code default.api.timeout.ms} expires
* before successful completion of the offset commit
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitSync() {
@ -916,7 +917,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
* of the offset commit
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitSync(Duration timeout) {
@ -964,7 +966,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
* of the offset commit
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
@ -1012,7 +1015,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
* of the offset commit
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
@ -1022,7 +1026,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Commit offsets returned on the last {@link #poll(Duration)} for all the subscribed list of topics and partition.
* Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitAsync() {
@ -1045,7 +1050,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* (and variants) returns.
*
* @param callback Callback to invoke when the commit completes
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitAsync(OffsetCommitCallback callback) {
@ -1072,7 +1078,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param offsets A map of offsets by partition with associate metadata. This map will be copied internally, so it
* is safe to mutate the map after returning.
* @param callback Callback to invoke when the commit completes
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {

View File

@ -75,7 +75,6 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
@ -117,7 +116,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -249,7 +247,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
private boolean cachedSubscriptionHasAllFetchPositions;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final AtomicBoolean asyncCommitFenced;
// Last triggered async commit future. Used to wait until all previous async commits are completed.
// We only need to keep track of the last one, since they are guaranteed to complete in order.
private CompletableFuture<Void> lastPendingAsyncCommit = null;
@ -336,7 +333,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
backgroundEventHandler);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.asyncCommitFenced = new AtomicBoolean(false);
this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig));
final Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time,
logContext,
@ -448,7 +444,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.asyncCommitFenced = new AtomicBoolean(false);
}
AsyncKafkaConsumer(LogContext logContext,
@ -511,7 +506,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
backgroundEventHandler
);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.asyncCommitFenced = new AtomicBoolean(false);
Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(
time,
logContext,
@ -766,10 +760,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets);
}
if (t instanceof FencedInstanceIdException) {
asyncCommitFenced.set(true);
}
if (callback == null) {
if (t != null) {
log.error("Offset commit with offsets {} failed", offsets, t);
@ -786,7 +776,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
maybeThrowInvalidGroupIdException();
maybeThrowFencedInstanceException();
offsetCommitCallbackInvoker.executeCallbacks();
Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets();
@ -1657,7 +1646,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
@Override
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
maybeThrowFencedInstanceException();
offsetCommitCallbackInvoker.executeCallbacks();
try {
applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
@ -1940,20 +1928,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
return kafkaConsumerMetrics;
}
private void maybeThrowFencedInstanceException() {
if (asyncCommitFenced.get()) {
String groupInstanceId = "unknown";
if (!groupMetadata.get().isPresent()) {
log.error("No group metadata found although a group ID was provided. This is a bug!");
} else if (!groupMetadata.get().get().groupInstanceId().isPresent()) {
log.error("No group instance ID found although the consumer is fenced. This is a bug!");
} else {
groupInstanceId = groupMetadata.get().get().groupInstanceId().get();
}
throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + groupInstanceId);
}
}
// Visible for testing
SubscriptionState subscriptions() {
return subscriptions;

View File

@ -735,11 +735,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
coordinatorRequestManager.markCoordinatorUnknown(error.message(), currentTimeMs);
future.completeExceptionally(error.exception());
return;
} else if (error == Errors.FENCED_INSTANCE_ID) {
String fencedError = "OffsetCommit failed due to group instance id fenced: " + groupInstanceId;
log.error(fencedError);
future.completeExceptionally(new CommitFailedException(fencedError));
return;
} else if (error == Errors.OFFSET_METADATA_TOO_LARGE ||
error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
future.completeExceptionally(error.exception());

View File

@ -57,7 +57,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
@ -334,23 +333,6 @@ public class AsyncKafkaConsumerTest {
new GroupAuthorizationException("Group authorization exception"));
}
@Test
public void testCommitAsyncWithFencedException() {
consumer = newConsumer();
completeCommitSyncApplicationEventSuccessfully();
final Map<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset();
MockCommitCallback callback = new MockCommitCallback();
assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class);
verify(applicationEventHandler).add(commitEventCaptor.capture());
final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> consumer.commitAsync());
}
@Test
public void testCommitted() {
time = new MockTime(1);
@ -610,52 +592,6 @@ public class AsyncKafkaConsumerTest {
}
}
@Test
public void testCommitAsyncTriggersFencedExceptionFromCommitAsync() {
final String groupId = "consumerGroupA";
final String groupInstanceId = "groupInstanceId1";
final Properties props = requiredConsumerConfigAndGroupId(groupId);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
assertDoesNotThrow(() -> consumer.commitAsync());
Exception e = assertThrows(FencedInstanceIdException.class, () -> consumer.commitAsync());
assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage());
}
@Test
public void testCommitSyncTriggersFencedExceptionFromCommitAsync() {
final String groupId = "consumerGroupA";
final String groupInstanceId = "groupInstanceId1";
final Properties props = requiredConsumerConfigAndGroupId(groupId);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
assertDoesNotThrow(() -> consumer.commitAsync());
Exception e = assertThrows(FencedInstanceIdException.class, () -> consumer.commitSync());
assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage());
}
@Test
public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
final TopicPartition tp = new TopicPartition("foo", 0);
@ -739,29 +675,6 @@ public class AsyncKafkaConsumerTest {
return allValues.get(allValues.size() - 1);
}
@Test
public void testPollTriggersFencedExceptionFromCommitAsync() {
final String groupId = "consumerGroupA";
final String groupInstanceId = "groupInstanceId1";
final Properties props = requiredConsumerConfigAndGroupId(groupId);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
assertDoesNotThrow(() -> consumer.commitAsync());
Exception e = assertThrows(FencedInstanceIdException.class, () -> consumer.poll(Duration.ZERO));
assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage());
}
@Test
public void testEnsurePollExecutedCommitAsyncCallbacks() {
consumer = newConsumer();

View File

@ -340,27 +340,8 @@ public class CommitRequestManagerTest {
assertExceptionHandling(commitRequestManager, error, true);
}
@ParameterizedTest
@MethodSource("commitSyncExpectedExceptions")
public void testCommitSyncFailsWithExpectedException(Errors commitError,
Class<? extends Exception> expectedException) {
CommitRequestManager commitRequestManager = create(false, 100);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));
// Send sync offset commit that fails and verify it propagates the expected exception.
long deadlineMs = time.milliseconds() + retryBackoffMs;
CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager, commitError);
assertFutureThrows(commitResult, expectedException);
}
private static Stream<Arguments> commitSyncExpectedExceptions() {
return Stream.of(
Arguments.of(Errors.FENCED_INSTANCE_ID, CommitFailedException.class),
Arguments.of(Errors.UNKNOWN_MEMBER_ID, CommitFailedException.class),
Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, Errors.OFFSET_METADATA_TOO_LARGE.exception().getClass()),
Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE.exception().getClass()),
@ -985,10 +966,6 @@ public class CommitRequestManagerTest {
case INVALID_COMMIT_OFFSET_SIZE:
assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
break;
case FENCED_INSTANCE_ID:
// This is a fatal failure, so we should not retry
assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
break;
default:
if (errors.exception() instanceof RetriableException && requestShouldBeRetried) {
assertRetryBackOff(commitRequestManager, remainBackoffMs);
@ -1279,7 +1256,6 @@ public class CommitRequestManagerTest {
Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
Arguments.of(Errors.REQUEST_TIMED_OUT),
Arguments.of(Errors.FENCED_INSTANCE_ID),
Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
Arguments.of(Errors.STALE_MEMBER_EPOCH),
Arguments.of(Errors.UNKNOWN_MEMBER_ID));
@ -1299,7 +1275,6 @@ public class CommitRequestManagerTest {
Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
Arguments.of(Errors.REQUEST_TIMED_OUT),
Arguments.of(Errors.FENCED_INSTANCE_ID),
Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
Arguments.of(Errors.UNKNOWN_MEMBER_ID),
// Adding STALE_MEMBER_EPOCH as non-retriable here because it is only retried if a new