KAFKA-14196; Do not continue fetching partitions awaiting auto-commit prior to revocation (#12603)

When auto-commit is enabled with the "eager" rebalance strategy, the consumer will commit all offsets prior to revocation. Following recent changes, this offset commit is done asynchronously, which means there is an opportunity for fetches to continue returning data to the application. When this happens, the progress is lost following revocation, which results in duplicate consumption. This patch fixes the problem by adding a flag in `SubscriptionState` to ensure that partitions which are awaiting revocation will not continue being fetched.

Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Philip Nee 2022-09-12 21:02:13 -07:00 committed by GitHub
parent b9774c0b02
commit 536cdf692f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 1 deletions

View File

@ -759,6 +759,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// async commit offsets prior to rebalance if auto-commit enabled
// and there is no in-flight offset commit request
if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
maybeMarkPartitionsPendingRevocation();
autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
}
@ -859,6 +860,22 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return true;
}
private void maybeMarkPartitionsPendingRevocation() {
if (protocol != RebalanceProtocol.EAGER) {
return;
}
// When asynchronously committing offsets prior to the revocation of a set of partitions, there will be a
// window of time between when the offset commit is sent and when it returns and revocation completes. It is
// possible for pending fetches for these partitions to return during this time, which means the application's
// position may get ahead of the committed position prior to revocation. This can cause duplicate consumption.
// To prevent this, we mark the partitions as "pending revocation," which stops the Fetcher from sending new
// fetches or returning data from previous fetches to the user.
Set<TopicPartition> partitions = subscriptions.assignedPartitions();
log.debug("Marking assigned partitions pending for revocation: {}", partitions);
subscriptions.markPendingRevocation(partitions);
}
@Override
public void onLeavePrepare() {
// Save the current Generation, as the hb thread can change it at any time

View File

@ -738,6 +738,10 @@ public class SubscriptionState {
assignedState(tp).pause();
}
public synchronized void markPendingRevocation(Set<TopicPartition> tps) {
tps.forEach(tp -> assignedState(tp).markPendingRevocation());
}
public synchronized void resume(TopicPartition tp) {
assignedState(tp).resume();
}
@ -769,6 +773,7 @@ public class SubscriptionState {
private Long logStartOffset; // the log start offset
private Long lastStableOffset;
private boolean paused; // whether this partition has been paused by the user
private boolean pendingRevocation;
private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting
private Long nextRetryTimeMs;
private Integer preferredReadReplica;
@ -777,6 +782,7 @@ public class SubscriptionState {
TopicPartitionState() {
this.paused = false;
this.pendingRevocation = false;
this.endOffsetRequested = false;
this.fetchState = FetchStates.INITIALIZING;
this.position = null;
@ -966,12 +972,16 @@ public class SubscriptionState {
this.paused = true;
}
private void markPendingRevocation() {
this.pendingRevocation = true;
}
private void resume() {
this.paused = false;
}
private boolean isFetchable() {
return !paused && hasValidPosition();
return !paused && !pendingRevocation && hasValidPosition();
}
private void highWatermark(Long highWatermark) {

View File

@ -272,6 +272,45 @@ public class FetcherTest {
}
}
@Test
public void testInflightFetchOnPendingPartitions() {
buildFetcher();
assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);
assertEquals(1, fetcher.sendFetches());
subscriptions.markPendingRevocation(singleton(tp0));
client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0));
consumerClient.poll(time.timer(0));
assertNull(fetchedRecords().get(tp0));
}
@Test
public void testFetchingPendingPartitions() {
buildFetcher();
assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);
// normal fetch
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0));
consumerClient.poll(time.timer(0));
assertTrue(fetcher.hasCompletedFetches());
fetchedRecords();
assertEquals(4L, subscriptions.position(tp0).offset); // this is the next fetching position
// mark partition unfetchable
subscriptions.markPendingRevocation(singleton(tp0));
assertEquals(0, fetcher.sendFetches());
consumerClient.poll(time.timer(0));
assertFalse(fetcher.hasCompletedFetches());
fetchedRecords();
assertEquals(4L, subscriptions.position(tp0).offset);
}
@Test
public void testFetchWithNoTopicId() {
// Should work and default to using old request type.
@ -2283,6 +2322,37 @@ public class FetcherTest {
assertEquals(5, subscriptions.position(tp0).offset);
}
@Test
public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() {
buildFetcher();
assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 100);
subscriptions.seek(tp0, 100);
subscriptions.seek(tp0, 100);
assertEquals(100, subscriptions.position(tp0).offset);
assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused
subscriptions.markPendingRevocation(singleton(tp0));
fetcher.resetOffsetsIfNeeded();
// once a partition is marked pending, it should not be fetchable
assertFalse(subscriptions.isOffsetResetNeeded(tp0));
assertFalse(subscriptions.isFetchable(tp0));
assertTrue(subscriptions.hasValidPosition(tp0));
assertEquals(100, subscriptions.position(tp0).offset);
subscriptions.seek(tp0, 100);
assertEquals(100, subscriptions.position(tp0).offset);
// reassignment should enable fetching of the same partition
subscriptions.unsubscribe();
assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 100);
assertEquals(100, subscriptions.position(tp0).offset);
assertTrue(subscriptions.isFetchable(tp0));
}
@Test
public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
buildFetcher();

View File

@ -256,6 +256,16 @@ public class SubscriptionStateTest {
assertTrue(state.isFetchable(tp0));
}
@Test
public void testMarkingPartitionPending() {
state.assignFromUser(singleton(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
state.markPendingRevocation(singleton(tp0));
assertFalse(state.isFetchable(tp0));
assertFalse(state.isPaused(tp0));
}
@Test
public void invalidPositionUpdate() {
state.subscribe(singleton(topic), rebalanceListener);