HOTFIX: move rebalanceInProgress check to skip commit during handleCorrupted (#10444)

Minor followup to #10407 -- we need to extract the rebalanceInProgress check down into the commitAndFillInConsumedOffsetsAndMetadataPerTaskMap method which is invoked during handleCorrupted, otherwise we may attempt to commit during a a rebalance which will fail

Reviewers: Matthias J. Sax <mjsax@confluent.io>
This commit is contained in:
A. Sophie Blee-Goldman 2021-03-30 18:55:38 -07:00 committed by GitHub
parent b35704a843
commit 3eff8d39f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 12 deletions

View File

@ -532,6 +532,9 @@ public class TaskManager {
// as such we just need to skip those dirty tasks in the checkpoint
final Set<Task> dirtyTasks = new HashSet<>();
try {
// in handleRevocation we must call commitOffsetsOrTransaction() directly rather than
// commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the
// offset commit because we are in a rebalance
commitOffsetsOrTransaction(consumedOffsetsPerTask);
} catch (final TaskCorruptedException e) {
log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
@ -1002,28 +1005,34 @@ public class TaskManager {
*/
int commit(final Collection<Task> tasksToCommit) {
int committed = 0;
if (rebalanceInProgress) {
committed = -1;
} else {
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
try {
committed = commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, consumedOffsetsAndMetadataPerTask);
} catch (final TimeoutException timeoutException) {
consumedOffsetsAndMetadataPerTask
.keySet()
.forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
}
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
try {
committed = commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, consumedOffsetsAndMetadataPerTask);
} catch (final TimeoutException timeoutException) {
consumedOffsetsAndMetadataPerTask
.keySet()
.forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
}
return committed;
}
/**
* @throws TaskMigratedException if committing offsets failed (non-EOS)
* or if the task producer got fenced (EOS)
* @throws TimeoutException if committing offsets failed due to TimeoutException (non-EOS)
* @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS)
* @param consumedOffsetsAndMetadataPerTask an empty map that will be filled in with the prepared offsets
* @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
*/
private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final Collection<Task> tasksToCommit,
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask) {
int committed = 0;
if (rebalanceInProgress) {
return -1;
}
int committed = 0;
for (final Task task : tasksToCommit) {
if (task.commitNeeded()) {
final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
@ -1063,6 +1072,9 @@ public class TaskManager {
}
/**
* Caution: do not invoke this directly if it's possible a rebalance is occurring, as the commit will fail. If
* this is a possibility, prefer the {@link #commitAndFillInConsumedOffsetsAndMetadataPerTaskMap} instead.
*
* @throws TaskMigratedException if committing offsets failed due to CommitFailedException (non-EOS)
* @throws TimeoutException if committing offsets failed due to TimeoutException (non-EOS)
* @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS)

View File

@ -821,6 +821,57 @@ public class TaskManagerTest {
verify(consumer);
}
@Test
public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
final ProcessorStateManager stateManager = EasyMock.createNiceMock(ProcessorStateManager.class);
expect(stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new File[0]);
final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
// make sure this will attempt to be committed and throw
final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
uncorruptedActive.setCommitNeeded();
// handleAssignment
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
assignment.putAll(taskId00Assignment);
assignment.putAll(taskId01Assignment);
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
topologyBuilder.addSubscribedTopicsFromMetadata(eq(singleton(topic1)), anyObject());
expectLastCall().anyTimes();
expectRestoreToBeCompleted(consumer, changeLogReader);
expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader, stateDirectory, stateManager);
uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(uncorruptedActive.state(), is(Task.State.RUNNING));
assertThat(uncorruptedActive.commitPrepared, is(false));
assertThat(uncorruptedActive.commitNeeded, is(true));
assertThat(uncorruptedActive.commitCompleted, is(false));
taskManager.handleRebalanceStart(singleton(topic1));
assertThat(taskManager.isRebalanceInProgress(), is(true));
taskManager.handleCorruption(singleton(taskId00));
assertThat(uncorruptedActive.commitPrepared, is(false));
assertThat(uncorruptedActive.commitNeeded, is(true));
assertThat(uncorruptedActive.commitCompleted, is(false));
assertThat(uncorruptedActive.state(), is(State.RUNNING));
verify(consumer);
}
@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);