KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions (#9170)

In order to do this, I also removed the optimization such that once enforced checkpoint is set to true, we always checkpoint unless the state stores are not initialized at all (i.e. the snapshot is null).

Reviewers: Boyang Chen <boyang@confluent.io>, A. Sophie Blee-Goldman <ableegoldman@gmail.com>
This commit is contained in:
Guozhang Wang 2020-08-12 21:20:53 -07:00 committed by GitHub
parent a15c1a9302
commit d0800b3f7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 29 additions and 10 deletions

View File

@ -58,6 +58,9 @@ final class StateManagerUtil {
return false;
}
if (enforceCheckpoint)
return true;
// we can checkpoint if the the difference between the current and the previous snapshot is large enough
long totalOffsetDelta = 0L;
for (final Map.Entry<TopicPartition, Long> entry : newOffsetSnapshot.entrySet()) {
@ -66,7 +69,7 @@ final class StateManagerUtil {
// when enforcing checkpoint is required, we should overwrite the checkpoint if it is different from the old one;
// otherwise, we only overwrite the checkpoint if it is largely different from the old one
return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
return totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;
}
/**

View File

@ -193,13 +193,17 @@ public class TaskManager {
try {
task.suspend();
// we need to enforce a checkpoint that removes the corrupted partitions
task.postCommit(true);
} catch (final RuntimeException swallow) {
log.error("Error suspending corrupted task {} ", task.id(), swallow);
}
task.closeDirty();
// For active tasks pause their input partitions so we won't poll any more records
// for this task until it has been re-initialized;
// Note, closeDirty already clears the partitiongroup for the task.
if (task.isActive()) {
// Pause so we won't poll any more records for this task until it has been re-initialized
// Note, closeDirty already clears the partitiongroup for the task.
final Set<TopicPartition> currentAssignment = mainConsumer().assignment();
final Set<TopicPartition> taskInputPartitions = task.inputPartitions();
final Set<TopicPartition> assignedToPauseAndReset =

View File

@ -1609,12 +1609,11 @@ public class StreamTaskTest {
}
@Test
public void shouldCheckpointWithCreatedStateOnClose() {
public void shouldNotCheckpointOnCloseCreated() {
stateManager.flush();
EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
stateManager.checkpoint();
EasyMock.expectLastCall();
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition1));
EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
@ -1629,16 +1628,18 @@ public class StreamTaskTest {
assertFalse(source1.initialized);
assertFalse(source1.closed);
EasyMock.verify(stateManager, recordCollector);
final double expectedCloseTaskMetric = 1.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
}
@Test
public void shouldNotCheckpointOnCloseRestoringIfNoProgress() {
public void shouldCheckpointOnCloseRestoringIfNoProgress() {
stateManager.flush();
EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
EasyMock.expectLastCall().once();
stateManager.checkpoint();
EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
EasyMock.expectLastCall().once();
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();

View File

@ -561,9 +561,19 @@ public class TaskManagerTest {
public void shouldReviveCorruptTasks() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
EasyMock.expectLastCall().once();
replay(stateManager);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public void postCommit(final boolean enforceCheckpoint) {
if (enforceCheckpoint) {
enforcedCheckpoint.set(true);
}
super.postCommit(enforceCheckpoint);
}
};
// `handleAssignment`
expectRestoreToBeCompleted(consumer, changeLogReader);
@ -586,6 +596,7 @@ public class TaskManagerTest {
taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions));
assertThat(task00.state(), is(Task.State.CREATED));
assertThat(enforcedCheckpoint.get(), is(true));
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());