diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index fefbfbc09b6..953a4e81c0a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -47,6 +47,7 @@ import static java.lang.String.format; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore; import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt; +import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN; /** * ProcessorStateManager is the source of truth for the current offset for each state store, @@ -225,7 +226,8 @@ public class ProcessorStateManager implements StateManager { log.info("State store {} is not logged and hence would not be restored", store.stateStore.name()); } else if (store.offset() == null) { if (loadedCheckpoints.containsKey(store.changelogPartition)) { - store.setOffset(loadedCheckpoints.remove(store.changelogPartition)); + final Long offset = changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition)); + store.setOffset(offset); log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); @@ -538,10 +540,10 @@ public class ProcessorStateManager implements StateManager { // store is logged, persistent, not corrupted, and has a valid current offset if (storeMetadata.changelogPartition != null && storeMetadata.stateStore.persistent() && - storeMetadata.offset != null && !storeMetadata.corrupted) { - checkpointingOffsets.put(storeMetadata.changelogPartition, storeMetadata.offset); + final long checkpointableOffset = checkpointableOffsetFromChangelogOffset(storeMetadata.offset); + checkpointingOffsets.put(storeMetadata.changelogPartition, checkpointableOffset); } } @@ -578,4 +580,14 @@ public class ProcessorStateManager implements StateManager { return found.isEmpty() ? null : found.get(0); } + + // Pass in a sentinel value to checkpoint when the changelog offset is not yet initialized/known + private long checkpointableOffsetFromChangelogOffset(final Long offset) { + return offset != null ? offset : OFFSET_UNKNOWN; + } + + // Convert the written offsets in the checkpoint file back to the changelog offset + private Long changelogOffsetFromCheckpointedOffset(final long offset) { + return offset != OFFSET_UNKNOWN ? offset : null; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 173efff808f..c52ebdf1876 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskIdFormatException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task.State; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.slf4j.Logger; @@ -242,9 +243,14 @@ public class TaskManager { for (final Task task : tasksToClose) { try { - if (!task.isActive()) { - // Active tasks should have already been suspended and committed during handleRevocation, but - // standbys must be suspended/committed/closed all here + if (task.isActive()) { + // Active tasks are revoked and suspended/committed during #handleRevocation + if (!task.state().equals(State.SUSPENDED)) { + log.error("Active task {} should be suspended prior to attempting to close but was in {}", + task.id(), task.state()); + throw new IllegalStateException("Active task " + task.id() + " should have been suspended"); + } + } else { task.suspend(); task.prepareCommit(); task.postCommit(); @@ -268,10 +274,19 @@ public class TaskManager { final Task newTask; try { if (oldTask.isActive()) { + if (!oldTask.state().equals(State.SUSPENDED)) { + // Active tasks are revoked and suspended/committed during #handleRevocation + log.error("Active task {} should be suspended prior to attempting to close but was in {}", + oldTask.id(), oldTask.state()); + throw new IllegalStateException("Active task " + oldTask.id() + " should have been suspended"); + } final Set partitions = standbyTasksToCreate.remove(oldTask.id()); newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); + cleanUpTaskProducer(oldTask, taskCloseExceptions); } else { - oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already + oldTask.suspend(); + oldTask.prepareCommit(); + oldTask.postCommit(); final Set partitions = activeTasksToCreate.remove(oldTask.id()); newTask = activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, partitions, mainConsumer); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java index 003682e961f..a4875fbaee9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java @@ -58,6 +58,10 @@ public class OffsetCheckpoint { private static final int VERSION = 0; + // Use a negative sentinel when we don't know the offset instead of skipping it to distinguish it from dirty state + // and use -2 as the -1 sentinel may be taken by some producer errors + public static final long OFFSET_UNKNOWN = -2; + private final File file; private final Object lock; @@ -91,7 +95,7 @@ public class OffsetCheckpoint { for (final Map.Entry entry : offsets.entrySet()) { final TopicPartition tp = entry.getKey(); final Long offset = entry.getValue(); - if (offset >= 0L) { + if (isValid(offset)) { writeEntry(writer, tp, offset); } else { LOG.error("Received offset={} to write to checkpoint file for {}", offset, tp); @@ -144,7 +148,7 @@ public class OffsetCheckpoint { final int version = readInt(reader); switch (version) { case 0: - final int expectedSize = readInt(reader); + int expectedSize = readInt(reader); final Map offsets = new HashMap<>(); String line = reader.readLine(); while (line != null) { @@ -158,10 +162,11 @@ public class OffsetCheckpoint { final int partition = Integer.parseInt(pieces[1]); final TopicPartition tp = new TopicPartition(topic, partition); final long offset = Long.parseLong(pieces[2]); - if (offset >= 0L) { + if (isValid(offset)) { offsets.put(tp, offset); } else { LOG.warn("Read offset={} from checkpoint file for {}", offset, tp); + --expectedSize; } line = reader.readLine(); @@ -204,4 +209,8 @@ public class OffsetCheckpoint { return file.getAbsolutePath(); } + private boolean isValid(final long offset) { + return offset >= 0L || offset == OFFSET_UNKNOWN; + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java index cc80d08ce83..0a1d8740bfc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java @@ -32,9 +32,12 @@ import org.junit.Test; import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeEntry; import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeIntLine; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; public class OffsetCheckpointTest { @@ -82,7 +85,7 @@ public class OffsetCheckpointTest { } @Test - public void shouldSkipNegativeOffsetsDuringRead() throws IOException { + public void shouldSkipInvalidOffsetsDuringRead() throws IOException { final File file = TestUtils.tempFile(); final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file); @@ -91,20 +94,38 @@ public class OffsetCheckpointTest { offsets.put(new TopicPartition(topic, 0), -1L); writeVersion0(offsets, file); + assertTrue(checkpoint.read().isEmpty()); } finally { checkpoint.delete(); } } @Test - public void shouldThrowOnNegativeOffsetInWrite() throws IOException { + public void shouldReadAndWriteSentinelOffset() throws IOException { + final File f = TestUtils.tempFile(); + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f); + + try { + final Map offsetsToWrite = new HashMap<>(); + offsetsToWrite.put(new TopicPartition(topic, 1), -2L); + checkpoint.write(offsetsToWrite); + + final Map readOffsets = checkpoint.read(); + assertThat(readOffsets.get(new TopicPartition(topic, 1)), equalTo(-2L)); + } finally { + checkpoint.delete(); + } + } + + @Test + public void shouldThrowOnInvalidOffsetInWrite() throws IOException { final File f = TestUtils.tempFile(); final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f); try { final Map offsets = new HashMap<>(); offsets.put(new TopicPartition(topic, 0), 0L); - offsets.put(new TopicPartition(topic, 1), -1L); + offsets.put(new TopicPartition(topic, 1), -1L); // invalid offsets.put(new TopicPartition(topic, 2), 2L); assertThrows(IllegalStateException.class, () -> checkpoint.write(offsets));