KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory (#8962)

Two more edge cases I found producing extra TaskcorruptedException while playing around with the failing eos-beta upgrade test (sadly these are unrelated problems, as the test still fails with these fixes in place).

* Need to write the checkpoint when recycling a standby: although we do preserve the changelog offsets when recycling a task, and should therefore write the offsets when the new task is itself closed, we do NOT write the checkpoint for uninitialized tasks. So if the new task is ultimately closed before it gets out of the CREATED state, the offsets will not be written and we can get a TaskCorruptedException
* We do not write the checkpoint file if the current offset map is empty; however for eos the checkpoint file is not only used for restoration but also for clean shutdown. Although skipping a dummy checkpoint file does not actually violate any correctness since we are going to re-bootstrap from the log-start-offset anyways, it throws unnecessary TaskCorruptedException which has an overhead itself.

Reviewers: John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
A. Sophie Blee-Goldman 2020-07-06 17:16:12 -07:00 committed by Guozhang Wang
parent 92c23d2d19
commit 5bea1a423b
4 changed files with 70 additions and 13 deletions

View File

@ -47,6 +47,7 @@ import static java.lang.String.format;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.OFFSET_UNKNOWN;
/**
* ProcessorStateManager is the source of truth for the current offset for each state store,
@ -225,7 +226,8 @@ public class ProcessorStateManager implements StateManager {
log.info("State store {} is not logged and hence would not be restored", store.stateStore.name());
} else if (store.offset() == null) {
if (loadedCheckpoints.containsKey(store.changelogPartition)) {
store.setOffset(loadedCheckpoints.remove(store.changelogPartition));
final Long offset = changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
store.setOffset(offset);
log.debug("State store {} initialized from checkpoint with offset {} at changelog {}",
store.stateStore.name(), store.offset, store.changelogPartition);
@ -538,10 +540,10 @@ public class ProcessorStateManager implements StateManager {
// store is logged, persistent, not corrupted, and has a valid current offset
if (storeMetadata.changelogPartition != null &&
storeMetadata.stateStore.persistent() &&
storeMetadata.offset != null &&
!storeMetadata.corrupted) {
checkpointingOffsets.put(storeMetadata.changelogPartition, storeMetadata.offset);
final long checkpointableOffset = checkpointableOffsetFromChangelogOffset(storeMetadata.offset);
checkpointingOffsets.put(storeMetadata.changelogPartition, checkpointableOffset);
}
}
@ -578,4 +580,14 @@ public class ProcessorStateManager implements StateManager {
return found.isEmpty() ? null : found.get(0);
}
// Pass in a sentinel value to checkpoint when the changelog offset is not yet initialized/known
private long checkpointableOffsetFromChangelogOffset(final Long offset) {
return offset != null ? offset : OFFSET_UNKNOWN;
}
// Convert the written offsets in the checkpoint file back to the changelog offset
private Long changelogOffsetFromCheckpointedOffset(final long offset) {
return offset != OFFSET_UNKNOWN ? offset : null;
}
}

View File

@ -35,6 +35,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
@ -242,9 +243,14 @@ public class TaskManager {
for (final Task task : tasksToClose) {
try {
if (!task.isActive()) {
// Active tasks should have already been suspended and committed during handleRevocation, but
// standbys must be suspended/committed/closed all here
if (task.isActive()) {
// Active tasks are revoked and suspended/committed during #handleRevocation
if (!task.state().equals(State.SUSPENDED)) {
log.error("Active task {} should be suspended prior to attempting to close but was in {}",
task.id(), task.state());
throw new IllegalStateException("Active task " + task.id() + " should have been suspended");
}
} else {
task.suspend();
task.prepareCommit();
task.postCommit();
@ -268,10 +274,19 @@ public class TaskManager {
final Task newTask;
try {
if (oldTask.isActive()) {
if (!oldTask.state().equals(State.SUSPENDED)) {
// Active tasks are revoked and suspended/committed during #handleRevocation
log.error("Active task {} should be suspended prior to attempting to close but was in {}",
oldTask.id(), oldTask.state());
throw new IllegalStateException("Active task " + oldTask.id() + " should have been suspended");
}
final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id());
newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions);
cleanUpTaskProducer(oldTask, taskCloseExceptions);
} else {
oldTask.suspend(); // Only need to suspend transitioning standbys, actives should be suspended already
oldTask.suspend();
oldTask.prepareCommit();
oldTask.postCommit();
final Set<TopicPartition> partitions = activeTasksToCreate.remove(oldTask.id());
newTask = activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, partitions, mainConsumer);
}

View File

@ -58,6 +58,10 @@ public class OffsetCheckpoint {
private static final int VERSION = 0;
// Use a negative sentinel when we don't know the offset instead of skipping it to distinguish it from dirty state
// and use -2 as the -1 sentinel may be taken by some producer errors
public static final long OFFSET_UNKNOWN = -2;
private final File file;
private final Object lock;
@ -91,7 +95,7 @@ public class OffsetCheckpoint {
for (final Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
final TopicPartition tp = entry.getKey();
final Long offset = entry.getValue();
if (offset >= 0L) {
if (isValid(offset)) {
writeEntry(writer, tp, offset);
} else {
LOG.error("Received offset={} to write to checkpoint file for {}", offset, tp);
@ -144,7 +148,7 @@ public class OffsetCheckpoint {
final int version = readInt(reader);
switch (version) {
case 0:
final int expectedSize = readInt(reader);
int expectedSize = readInt(reader);
final Map<TopicPartition, Long> offsets = new HashMap<>();
String line = reader.readLine();
while (line != null) {
@ -158,10 +162,11 @@ public class OffsetCheckpoint {
final int partition = Integer.parseInt(pieces[1]);
final TopicPartition tp = new TopicPartition(topic, partition);
final long offset = Long.parseLong(pieces[2]);
if (offset >= 0L) {
if (isValid(offset)) {
offsets.put(tp, offset);
} else {
LOG.warn("Read offset={} from checkpoint file for {}", offset, tp);
--expectedSize;
}
line = reader.readLine();
@ -204,4 +209,8 @@ public class OffsetCheckpoint {
return file.getAbsolutePath();
}
private boolean isValid(final long offset) {
return offset >= 0L || offset == OFFSET_UNKNOWN;
}
}

View File

@ -32,9 +32,12 @@ import org.junit.Test;
import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeEntry;
import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeIntLine;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class OffsetCheckpointTest {
@ -82,7 +85,7 @@ public class OffsetCheckpointTest {
}
@Test
public void shouldSkipNegativeOffsetsDuringRead() throws IOException {
public void shouldSkipInvalidOffsetsDuringRead() throws IOException {
final File file = TestUtils.tempFile();
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);
@ -91,20 +94,38 @@ public class OffsetCheckpointTest {
offsets.put(new TopicPartition(topic, 0), -1L);
writeVersion0(offsets, file);
assertTrue(checkpoint.read().isEmpty());
} finally {
checkpoint.delete();
}
}
@Test
public void shouldThrowOnNegativeOffsetInWrite() throws IOException {
public void shouldReadAndWriteSentinelOffset() throws IOException {
final File f = TestUtils.tempFile();
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
try {
final Map<TopicPartition, Long> offsetsToWrite = new HashMap<>();
offsetsToWrite.put(new TopicPartition(topic, 1), -2L);
checkpoint.write(offsetsToWrite);
final Map<TopicPartition, Long> readOffsets = checkpoint.read();
assertThat(readOffsets.get(new TopicPartition(topic, 1)), equalTo(-2L));
} finally {
checkpoint.delete();
}
}
@Test
public void shouldThrowOnInvalidOffsetInWrite() throws IOException {
final File f = TestUtils.tempFile();
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
try {
final Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(new TopicPartition(topic, 0), 0L);
offsets.put(new TopicPartition(topic, 1), -1L);
offsets.put(new TopicPartition(topic, 1), -1L); // invalid
offsets.put(new TopicPartition(topic, 2), 2L);
assertThrows(IllegalStateException.class, () -> checkpoint.write(offsets));