diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java index 00f2530e314..2b75aaad409 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java @@ -58,6 +58,9 @@ final class StateManagerUtil { return false; } + if (enforceCheckpoint) + return true; + // we can checkpoint if the the difference between the current and the previous snapshot is large enough long totalOffsetDelta = 0L; for (final Map.Entry entry : newOffsetSnapshot.entrySet()) { @@ -66,7 +69,7 @@ final class StateManagerUtil { // when enforcing checkpoint is required, we should overwrite the checkpoint if it is different from the old one; // otherwise, we only overwrite the checkpoint if it is largely different from the old one - return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT; + return totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT; } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 88c17a6532f..b056fec93c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -193,13 +193,17 @@ public class TaskManager { try { task.suspend(); + // we need to enforce a checkpoint that removes the corrupted partitions + task.postCommit(true); } catch (final RuntimeException swallow) { log.error("Error suspending corrupted task {} ", task.id(), swallow); } task.closeDirty(); + + // For active tasks pause their input partitions so we won't poll any more records + // for this task until it has been re-initialized; + // Note, closeDirty already clears the partitiongroup for the task. if (task.isActive()) { - // Pause so we won't poll any more records for this task until it has been re-initialized - // Note, closeDirty already clears the partitiongroup for the task. final Set currentAssignment = mainConsumer().assignment(); final Set taskInputPartitions = task.inputPartitions(); final Set assignedToPauseAndReset = diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 714fa234888..27ac9121ceb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1609,12 +1609,11 @@ public class StreamTaskTest { } @Test - public void shouldCheckpointWithCreatedStateOnClose() { + public void shouldNotCheckpointOnCloseCreated() { stateManager.flush(); EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes(); stateManager.checkpoint(); - EasyMock.expectLastCall(); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition1)); + EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes(); EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes(); EasyMock.replay(stateManager, recordCollector); @@ -1629,16 +1628,18 @@ public class StreamTaskTest { assertFalse(source1.initialized); assertFalse(source1.closed); + EasyMock.verify(stateManager, recordCollector); + final double expectedCloseTaskMetric = 1.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); } @Test - public void shouldNotCheckpointOnCloseRestoringIfNoProgress() { + public void shouldCheckpointOnCloseRestoringIfNoProgress() { stateManager.flush(); - EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes(); + EasyMock.expectLastCall().once(); stateManager.checkpoint(); - EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes(); + EasyMock.expectLastCall().once(); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes(); EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 6e878e50b17..239a7d7ca34 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -561,9 +561,19 @@ public class TaskManagerTest { public void shouldReviveCorruptTasks() { final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class); stateManager.markChangelogAsCorrupted(taskId00Partitions); + EasyMock.expectLastCall().once(); replay(stateManager); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false); + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { + @Override + public void postCommit(final boolean enforceCheckpoint) { + if (enforceCheckpoint) { + enforcedCheckpoint.set(true); + } + super.postCommit(enforceCheckpoint); + } + }; // `handleAssignment` expectRestoreToBeCompleted(consumer, changeLogReader); @@ -586,6 +596,7 @@ public class TaskManagerTest { taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)); assertThat(task00.state(), is(Task.State.CREATED)); + assertThat(enforcedCheckpoint.get(), is(true)); assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00))); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());