KAFKA-10144: clean up corrupted standby tasks before attempting a commit (#8849)

We need to make sure that corrupted standby tasks are actually cleaned up upon a TaskCorruptedException. However due to the commit prior to invoking handleCorruption, it's possible to throw a TaskMigratedException before actually cleaning up any of the corrupted tasks.

This is fine for active tasks since handleLostAll will finish up the job, but it does nothing with standby tasks. We should make sure that standby tasks are handled before attempting to commit (which we can do, since we don't need to commit anything for the corrupted standbys)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
A. Sophie Blee-Goldman 2020-06-12 16:21:57 -07:00 committed by GitHub
parent 3d45e1f09e
commit 03ed08d0d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 146 additions and 95 deletions

View File

@ -214,9 +214,7 @@
files="RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java"/>
<suppress checks="JavaNCSS"
files="KStreamKStreamJoinTest.java"/>
<suppress checks="JavaNCSS"
files="SmokeTestDriver.java"/>
files="(KStreamKStreamJoinTest|SmokeTestDriver|TaskManagerTest).java"/>
<suppress checks="NPathComplexity"
files="EosTestDriver|KStreamKStreamJoinTest.java|RelationalSmokeTest.java|SmokeTestDriver.java|KStreamKStreamLeftJoinTest.java|KTableKTableForeignKeyJoinIntegrationTest.java"/>

View File

@ -137,7 +137,7 @@ public class StandbyTask extends AbstractTask implements Task {
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
if (state() == State.RUNNING || state() == State.SUSPENDED) {
stateMgr.flush();
log.info("Task ready for committing");
log.debug("Prepared task for committing");
} else {
throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
}
@ -152,7 +152,7 @@ public class StandbyTask extends AbstractTask implements Task {
// and the state current offset would be used to checkpoint
stateMgr.checkpoint(Collections.emptyMap());
offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
log.info("Finalized commit");
log.debug("Finalized commit");
} else {
throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id);
}

View File

@ -443,7 +443,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
}
log.debug("Committed");
log.debug("Finalized commit");
}
private Map<TopicPartition, Long> extractPartitionTimes() {

View File

@ -557,14 +557,6 @@ public class StreamThread extends Thread {
log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e);
try {
taskManager.commit(
taskManager.tasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
.collect(Collectors.toSet())
);
taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
} catch (final TaskMigratedException taskMigrated) {
handleTaskMigrated(taskMigrated);

View File

@ -153,10 +153,38 @@ public class TaskManager {
rebalanceInProgress = false;
}
void handleCorruption(final Map<TaskId, Collection<TopicPartition>> taskWithChangelogs) {
for (final Map.Entry<TaskId, Collection<TopicPartition>> entry : taskWithChangelogs.entrySet()) {
final TaskId taskId = entry.getKey();
void handleCorruption(final Map<TaskId, Collection<TopicPartition>> tasksWithChangelogs) throws TaskMigratedException {
final Map<Task, Collection<TopicPartition>> corruptedStandbyTasks = new HashMap<>();
final Map<Task, Collection<TopicPartition>> corruptedActiveTasks = new HashMap<>();
for (final Map.Entry<TaskId, Collection<TopicPartition>> taskEntry : tasksWithChangelogs.entrySet()) {
final TaskId taskId = taskEntry.getKey();
final Task task = tasks.get(taskId);
if (task.isActive()) {
corruptedActiveTasks.put(task, taskEntry.getValue());
} else {
corruptedStandbyTasks.put(task, taskEntry.getValue());
}
}
// Make sure to clean up any corrupted standby tasks in their entirety before committing
// since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks
closeAndRevive(corruptedStandbyTasks);
commit(tasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
.filter(t -> !tasksWithChangelogs.containsKey(t.id()))
.collect(Collectors.toSet())
);
closeAndRevive(corruptedActiveTasks);
}
private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWithChangelogs) {
for (final Map.Entry<Task, Collection<TopicPartition>> entry : taskWithChangelogs.entrySet()) {
final Task task = entry.getKey();
// mark corrupted partitions to not be checkpointed, and then close the task as dirty
final Collection<TopicPartition> corruptedPartitions = entry.getValue();

View File

@ -48,7 +48,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
@ -92,7 +91,7 @@ public class StandbyTaskEOSIntegrationTest {
}
@Test
public void surviveWithOneTaskAsStandby() throws ExecutionException, InterruptedException, IOException {
public void surviveWithOneTaskAsStandby() throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
inputTopic,
Collections.singletonList(
@ -112,8 +111,6 @@ public class StandbyTaskEOSIntegrationTest {
final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch);
final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch);
) {
streamInstanceOne.start();
streamInstanceTwo.start();

View File

@ -100,7 +100,6 @@ import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@ -1915,7 +1914,7 @@ public class StreamThreadTest {
}
@Test
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() {
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
final Task task1 = mock(Task.class);
@ -1932,14 +1931,7 @@ public class StreamThreadTest {
expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(task2.id()).andReturn(taskId2).anyTimes();
expect(taskManager.tasks()).andReturn(mkMap(
mkEntry(taskId1, task1),
mkEntry(taskId2, task2)
)).anyTimes();
expect(taskManager.commit(singleton(task2))).andReturn(0);
taskManager.handleCorruption(singletonMap(taskId1, emptySet()));
expectLastCall();
taskManager.handleCorruption(corruptedTasksWithChangelogs);
EasyMock.replay(task1, task2, taskManager);
@ -1991,11 +1983,8 @@ public class StreamThreadTest {
expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(task2.id()).andReturn(taskId2).anyTimes();
expect(taskManager.tasks()).andReturn(mkMap(
mkEntry(taskId1, task1),
mkEntry(taskId2, task2)
)).anyTimes();
expect(taskManager.commit(singleton(task2))).andThrow(new TaskMigratedException("Task migrated",
taskManager.handleCorruption(corruptedTasksWithChangelogs);
expectLastCall().andThrow(new TaskMigratedException("Task migrated",
new RuntimeException("non-corrupted task migrated")));
taskManager.handleLostAll();
@ -2087,63 +2076,6 @@ public class StreamThreadTest {
verify(taskManager);
}
@Test
public void shouldNotCommitNonRunningNonCorruptedTasksOnTaskCorruptedException() {
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
final Task task1 = mock(Task.class);
final Task task2 = mock(Task.class);
final TaskId taskId1 = new TaskId(0, 0);
final TaskId taskId2 = new TaskId(0, 2);
final Map<TaskId, Collection<TopicPartition>> corruptedTasksWithChangelogs = mkMap(
mkEntry(taskId1, emptySet())
);
expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(task1.id()).andReturn(taskId1).anyTimes();
expect(task2.state()).andReturn(Task.State.CREATED).anyTimes();
expect(task2.id()).andReturn(taskId2).anyTimes();
expect(taskManager.tasks()).andReturn(mkMap(
mkEntry(taskId1, task1),
mkEntry(taskId2, task2)
)).anyTimes();
// expect not to try and commit task2, even though it's not corrupted, because it's not running.
expect(taskManager.commit(emptySet())).andReturn(0);
EasyMock.replay(task1, task2, taskManager);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
null,
consumer,
consumer,
null,
null,
taskManager,
streamsMetrics,
internalTopologyBuilder,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE)
) {
@Override
void runOnce() {
setState(State.PENDING_SHUTDOWN);
throw new TaskCorruptedException(corruptedTasksWithChangelogs);
}
}.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
thread.setState(StreamThread.State.STARTING);
thread.runLoop();
verify(taskManager);
}
@Test
public void shouldLogAndRecordSkippedRecordsForInvalidTimestampsWithBuiltInMetricsVersion0100To24() {
shouldLogAndRecordSkippedRecordsForInvalidTimestamps(StreamsConfig.METRICS_0100_TO_24);

View File

@ -99,6 +99,7 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
@ -629,6 +630,108 @@ public class TaskManagerTest {
verify(stateManager);
}
@Test
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(taskId00Assignment);
assignment.putAll(taskId01Assignment);
// `handleAssignment`
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
.andStubReturn(asList(corruptedTask, nonCorruptedTask));
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expectRestoreToBeCompleted(consumer, changeLogReader);
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(), is(true));
assertThat(nonCorruptedTask.state(), is(Task.State.RUNNING));
nonCorruptedTask.setCommitNeeded();
taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions));
assertTrue(nonCorruptedTask.commitPrepared);
}
@Test
public void shouldNotCommitNonRunningNonCorruptedTasks() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
nonRunningNonCorruptedTask.setCommitNeeded();
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(taskId00Assignment);
assignment.putAll(taskId01Assignment);
// `handleAssignment`
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
.andStubReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
replay(activeTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(nonRunningNonCorruptedTask.state(), is(Task.State.CREATED));
taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions));
verify(activeTaskCreator);
assertFalse(nonRunningNonCorruptedTask.commitPrepared);
}
@Test
public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
stateManager.markChangelogAsCorrupted(taskId00Partitions);
replay(stateManager);
final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
throw new TaskMigratedException("You dropped out of the group!", new RuntimeException());
}
};
// handleAssignment
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby));
expect(activeTaskCreator.createTasks(anyObject(), eq(taskId01Assignment))).andStubReturn(singleton(runningNonCorruptedActive));
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expectRestoreToBeCompleted(consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
taskManager.handleAssignment(taskId01Assignment, taskId00Assignment);
assertThat(taskManager.tryToCompleteRestoration(), is(true));
// make sure this will be committed and throw
assertThat(runningNonCorruptedActive.state(), is(Task.State.RUNNING));
assertThat(corruptedStandby.state(), is(Task.State.RUNNING));
runningNonCorruptedActive.setCommitNeeded();
assertThrows(TaskMigratedException.class, () -> taskManager.handleCorruption(singletonMap(taskId00, taskId00Partitions)));
assertThat(corruptedStandby.state(), is(Task.State.CREATED));
}
@Test
public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false);
@ -2567,7 +2670,8 @@ public class TaskManagerTest {
}
private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer,
final ChangelogReader changeLogReader, final boolean changeLogUpdateRequired) {
final ChangelogReader changeLogReader,
final boolean changeLogUpdateRequired) {
final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
expect(consumer.assignment()).andReturn(assignment);
consumer.resume(assignment);