KAFKA-10199: Handle exceptions from state updater (#12519)

1. In state updater, when handling task corrupted exception due to invalid restoring offset, first delete the affected partitions from the checkpoint before reporting it back to the stream thread. This is to mimic the same behavior in stream threads's StateManager#handleCorruption#closeDirtyAndRevive. It's cleaner to do so inside the restore thread, plus it enables us to optimize by only deleting those corrupted partitions, and not all.
2. In the state manager, handle the drained exceptions as follows (this is the same as handling all exceptions from handleAssignment): 1) Task-migrated, throw all the way to stream-thread as handleTaskMigrated, 2) any fatal Streams exception, throw all the way to stream-thread to trigger exception handler, 3) Task-corrupted, throw to the stream-thread as handleCorruption. Note that for 3), we would specially distinguish if the corrupted-tasks are already closed (when they are thrown from handleAssignment or not (when they are thrown from the state updater).

Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Guozhang Wang 2022-09-02 17:50:23 -07:00 committed by GitHub
parent 7ec10ce19a
commit 8380d2edf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 235 additions and 126 deletions

View File

@ -175,10 +175,20 @@ public class DefaultStateUpdater implements StateUpdater {
throw new IllegalStateException("Task " + taskId + " is corrupted but is not updating. " + BUG_ERROR_MESSAGE);
}
corruptedTasks.add(corruptedTask);
removeCheckpointForCorruptedTask(corruptedTask);
}
addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new ExceptionAndTasks(corruptedTasks, taskCorruptedException));
}
// TODO: we can let the exception encode the actual corrupted changelog partitions and only
// mark those instead of marking all changelogs
private void removeCheckpointForCorruptedTask(final Task task) {
task.markChangelogAsCorrupted(task.changelogPartitions());
// we need to enforce a checkpoint that removes the corrupted partitions
task.maybeCheckpoint(true);
}
private void handleStreamsException(final StreamsException streamsException) {
log.info("Encountered streams exception: ", streamsException);
if (streamsException.taskId().isPresent()) {

View File

@ -229,10 +229,12 @@ public class TaskManager {
private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, final boolean markAsCorrupted) {
for (final Task task : taskWithChangelogs) {
if (task.state() != State.CLOSED) {
final Collection<TopicPartition> corruptedPartitions = task.changelogPartitions();
// mark corrupted partitions to not be checkpointed, and then close the task as dirty
if (markAsCorrupted) {
// TODO: this step should be removed as we complete migrating to state updater
if (markAsCorrupted && stateUpdater == null) {
task.markChangelogAsCorrupted(corruptedPartitions);
}
@ -256,7 +258,7 @@ public class TaskManager {
log.error("Error suspending corrupted task {} ", task.id(), swallow);
}
task.closeDirty();
}
// For active tasks pause their input partitions so we won't poll any more records
// for this task until it has been re-initialized;
// Note, closeDirty already clears the partition-group for the task.
@ -322,40 +324,54 @@ public class TaskManager {
final Map<TaskId, RuntimeException> taskCloseExceptions = closeAndRecycleTasks(tasksToRecycle, tasksToCloseClean);
throwTaskExceptions(taskCloseExceptions);
maybeThrowTaskExceptions(taskCloseExceptions);
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
}
private void throwTaskExceptions(final Map<TaskId, RuntimeException> taskExceptions) {
// Wrap and throw the exception in the following order
// if at least one of the exception is a non-streams exception, then wrap and throw since it should be handled by thread's handler
// if at least one of the exception is a streams exception, then directly throw since it should be handled by thread's handler
// if at least one of the exception is a task-migrated exception, then directly throw since it indicates all tasks are lost
// otherwise, all the exceptions are task-corrupted, then merge their tasks and throw a single one
// TODO: move task-corrupted and task-migrated out of the public errors package since they are internal errors and always be
// handled by Streams library itself
private void maybeThrowTaskExceptions(final Map<TaskId, RuntimeException> taskExceptions) {
if (!taskExceptions.isEmpty()) {
log.error("Get exceptions for the following tasks: {}", taskExceptions);
final Set<TaskId> aggregatedCorruptedTaskIds = new HashSet<>();
StreamsException lastFatal = null;
TaskMigratedException lastTaskMigrated = null;
for (final Map.Entry<TaskId, RuntimeException> entry : taskExceptions.entrySet()) {
if (!(entry.getValue() instanceof TaskMigratedException)) {
final TaskId taskId = entry.getKey();
final RuntimeException exception = entry.getValue();
if (exception instanceof StreamsException) {
((StreamsException) exception).setTaskId(taskId);
throw exception;
} else if (exception instanceof KafkaException) {
throw new StreamsException(exception, taskId);
if (exception instanceof TaskMigratedException) {
lastTaskMigrated = (TaskMigratedException) exception;
} else if (exception instanceof TaskCorruptedException) {
log.warn("Encounter corrupted task" + taskId + ", will group it with other corrupted tasks " +
"and handle together", exception);
aggregatedCorruptedTaskIds.add(taskId);
} else {
throw new StreamsException(
"Unexpected failure to close " + taskExceptions.size() +
" task(s) [" + taskExceptions.keySet() + "]. " +
"First unexpected exception (for task " + taskId + ") follows.",
exception,
taskId
);
((StreamsException) exception).setTaskId(taskId);
lastFatal = (StreamsException) exception;
}
} else if (exception instanceof KafkaException) {
lastFatal = new StreamsException(exception, taskId);
} else {
lastFatal = new StreamsException("Encounter unexpected fatal error for task " + taskId, exception, taskId);
}
}
// If all exceptions are task-migrated, we would just throw the first one. No need to wrap with a
// StreamsException since TaskMigrated is handled explicitly by the StreamThread
final Map.Entry<TaskId, RuntimeException> first = taskExceptions.entrySet().iterator().next();
throw first.getValue();
if (lastFatal != null) {
throw lastFatal;
} else if (lastTaskMigrated != null) {
throw lastTaskMigrated;
} else {
throw new TaskCorruptedException(aggregatedCorruptedTaskIds);
}
}
}
@ -655,42 +671,14 @@ public class TaskManager {
} else {
addTasksToStateUpdater();
handleExceptionsFromStateUpdater();
handleRemovedTasksFromStateUpdater();
return handleRestoredTasks(now, offsetResetter);
return handleRestoredTasksFromStateUpdater(now, offsetResetter);
}
}
private boolean handleRestoredTasks(final long now,
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();
final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
final Duration timeout = Duration.ZERO;
for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) {
Set<TopicPartition> inputPartitions;
if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions);
} else if (tasks.removePendingTaskToCloseClean(task.id())) {
closeTaskClean(task, tasksToCloseDirty, taskExceptions);
} else if (tasks.removePendingTaskToCloseDirty(task.id())) {
tasksToCloseDirty.add(task);
} else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
} else {
transitRestoredTaskToRunning(task, now, offsetResetter);
}
}
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task, false);
}
throwTaskExceptions(taskExceptions);
return !stateUpdater.restoresActiveTasks();
}
private void recycleTask(final Task task,
final Set<TopicPartition> inputPartitions,
final Set<Task> tasksToCloseDirty,
@ -746,7 +734,7 @@ public class TaskManager {
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
try {
task.completeRestoration(offsetResetter);
tasks.addActiveTask(task);
tasks.addTask(task);
mainConsumer.resume(task.inputPartitions());
task.clearTaskTimeout();
} catch (final TimeoutException timeoutException) {
@ -767,6 +755,23 @@ public class TaskManager {
}
}
public void handleExceptionsFromStateUpdater() {
final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();
for (final StateUpdater.ExceptionAndTasks exceptionAndTasks : stateUpdater.drainExceptionsAndFailedTasks()) {
final RuntimeException exception = exceptionAndTasks.exception();
final Set<Task> failedTasks = exceptionAndTasks.getTasks();
for (final Task failedTask : failedTasks) {
// need to add task back to the bookkeeping to be handled by the stream thread
tasks.addTask(failedTask);
taskExceptions.put(failedTask.id(), exception);
}
}
maybeThrowTaskExceptions(taskExceptions);
}
private void handleRemovedTasksFromStateUpdater() {
final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();
final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
@ -788,11 +793,45 @@ public class TaskManager {
}
}
// for tasks that cannot be cleanly closed or recycled, close them dirty
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task, false);
}
throwTaskExceptions(taskExceptions);
maybeThrowTaskExceptions(taskExceptions);
}
private boolean handleRestoredTasksFromStateUpdater(final long now,
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();
final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
final Duration timeout = Duration.ZERO;
for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) {
Set<TopicPartition> inputPartitions;
if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions);
} else if (tasks.removePendingTaskToCloseClean(task.id())) {
closeTaskClean(task, tasksToCloseDirty, taskExceptions);
} else if (tasks.removePendingTaskToCloseDirty(task.id())) {
tasksToCloseDirty.add(task);
} else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
// if the restored task happen to need input partition update, we can transit it to running
// right after completing the update as well
transitRestoredTaskToRunning(task, now, offsetResetter);
} else {
transitRestoredTaskToRunning(task, now, offsetResetter);
}
}
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task, false);
}
maybeThrowTaskExceptions(taskExceptions);
return !stateUpdater.restoresActiveTasks();
}
/**

View File

@ -157,15 +157,23 @@ class Tasks implements TasksRegistry {
public void addActiveTasks(final Collection<Task> newTasks) {
if (!newTasks.isEmpty()) {
for (final Task activeTask : newTasks) {
addActiveTask(activeTask);
addTask(activeTask);
}
}
}
@Override
public void addActiveTask(final Task task) {
final TaskId taskId = task.id();
public void addStandbyTasks(final Collection<Task> newTasks) {
if (!newTasks.isEmpty()) {
for (final Task standbyTask : newTasks) {
addTask(standbyTask);
}
}
}
@Override
public void addTask(final Task task) {
final TaskId taskId = task.id();
if (activeTasksPerId.containsKey(taskId)) {
throw new IllegalStateException("Attempted to create an active task that we already own: " + taskId);
}
@ -174,29 +182,14 @@ class Tasks implements TasksRegistry {
throw new IllegalStateException("Attempted to create an active task while we already own its standby: " + taskId);
}
if (task.isActive()) {
activeTasksPerId.put(task.id(), task);
pendingActiveTasksToCreate.remove(task.id());
for (final TopicPartition topicPartition : task.inputPartitions()) {
activeTasksPerPartition.put(topicPartition, task);
}
}
@Override
public void addStandbyTasks(final Collection<Task> newTasks) {
if (!newTasks.isEmpty()) {
for (final Task standbyTask : newTasks) {
final TaskId taskId = standbyTask.id();
if (standbyTasksPerId.containsKey(taskId)) {
throw new IllegalStateException("Attempted to create an standby task that we already own: " + taskId);
}
if (activeTasksPerId.containsKey(taskId)) {
throw new IllegalStateException("Attempted to create an standby task while we already own its active: " + taskId);
}
standbyTasksPerId.put(standbyTask.id(), standbyTask);
}
} else {
standbyTasksPerId.put(task.id(), task);
}
}
@ -343,14 +336,4 @@ class Tasks implements TasksRegistry {
public boolean contains(final TaskId taskId) {
return getTask(taskId) != null;
}
// for testing only
@Override
public void addTask(final Task task) {
if (task.isActive()) {
activeTasksPerId.put(task.id(), task);
} else {
standbyTasksPerId.put(task.id(), task);
}
}
}

View File

@ -57,10 +57,10 @@ public interface TasksRegistry {
void addActiveTasks(final Collection<Task> tasks);
void addActiveTask(final Task task);
void addStandbyTasks(final Collection<Task> tasks);
void addTask(final Task task);
void removeTask(final Task taskToRemove);
void replaceActiveWithStandby(final StandbyTask standbyTask);
@ -86,6 +86,4 @@ public interface TasksRegistry {
Set<TaskId> allTaskIds();
boolean contains(final TaskId taskId);
void addTask(final Task task);
}

View File

@ -597,7 +597,7 @@ public class TaskManagerTest {
Mockito.verify(task).completeRestoration(noOpResetter);
Mockito.verify(task).clearTaskTimeout();
Mockito.verify(tasks).addActiveTask(task);
Mockito.verify(tasks).addTask(task);
verify(consumer);
}
@ -615,7 +615,7 @@ public class TaskManagerTest {
taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter);
Mockito.verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), Mockito.eq(timeoutException));
Mockito.verify(tasks, never()).addActiveTask(task);
Mockito.verify(tasks, never()).addTask(task);
Mockito.verify(task, never()).clearTaskTimeout();
verify(consumer);
}
@ -873,7 +873,7 @@ public class TaskManagerTest {
taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter);
Mockito.verify(tasks).addActiveTask(taskToTransitToRunning);
Mockito.verify(tasks).addTask(taskToTransitToRunning);
Mockito.verify(stateUpdater).add(recycledStandbyTask);
Mockito.verify(stateUpdater).add(recycledStandbyTask);
Mockito.verify(taskToCloseClean).closeClean();
@ -881,6 +881,86 @@ public class TaskManagerTest {
Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions), isNull());
}
@Test
public void shouldRethrowStreamsExceptionFromStateUpdater() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StreamsException exception = new StreamsException("boom!");
final StateUpdater.ExceptionAndTasks exceptionAndTasks = new StateUpdater.ExceptionAndTasks(
Collections.singleton(statefulTask),
exception
);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final StreamsException thrown = assertThrows(
StreamsException.class,
() -> taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter)
);
assertEquals(exception, thrown);
assertEquals(statefulTask.id(), thrown.taskId().get());
}
@Test
public void shouldRethrowRuntimeExceptionFromStateUpdater() {
final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final RuntimeException exception = new RuntimeException("boom!");
final StateUpdater.ExceptionAndTasks exceptionAndTasks = new StateUpdater.ExceptionAndTasks(
Collections.singleton(statefulTask),
exception
);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final StreamsException thrown = assertThrows(
StreamsException.class,
() -> taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter)
);
assertEquals(exception, thrown.getCause());
assertEquals(statefulTask.id(), thrown.taskId().get());
assertEquals("Encounter unexpected fatal error for task 0_0", thrown.getMessage());
}
@Test
public void shouldRethrowTaskCorruptedExceptionFromStateUpdater() {
final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId01Partitions).build();
final StateUpdater.ExceptionAndTasks exceptionAndTasks0 = new StateUpdater.ExceptionAndTasks(
Collections.singleton(statefulTask0),
new TaskCorruptedException(Collections.singleton(taskId00))
);
final StateUpdater.ExceptionAndTasks exceptionAndTasks1 = new StateUpdater.ExceptionAndTasks(
Collections.singleton(statefulTask1),
new TaskCorruptedException(Collections.singleton(taskId01))
);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0, exceptionAndTasks1));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskCorruptedException thrown = assertThrows(
TaskCorruptedException.class,
() -> taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter)
);
assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
assertEquals("Tasks [0_1, 0_0] are corrupted and hence needs to be re-initialized", thrown.getMessage());
}
@Test
public void shouldIdempotentlyUpdateSubscriptionFromActiveAssignment() {
final TopicPartition newTopicPartition = new TopicPartition("topic2", 1);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(mkEntry(taskId01, mkSet(t1p1, newTopicPartition)));
@ -1208,7 +1288,7 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(
thrown.getMessage(),
is("Unexpected failure to close 1 task(s) [[0_0]]. First unexpected exception (for task 0_0) follows.")
is("Encounter unexpected fatal error for task 0_0")
);
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
}
@ -1310,7 +1390,7 @@ public class TaskManagerTest {
assertThat(
thrown.getMessage(),
is("Unexpected failure to close 1 task(s) [[0_0]]. First unexpected exception (for task 0_0) follows.")
is("Encounter unexpected fatal error for task 0_0")
);
assertThat(thrown.getCause(), instanceOf(RuntimeException.class));
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
@ -3588,7 +3668,7 @@ public class TaskManagerTest {
// t1 should always be the first.
assertThat(
thrown.getMessage(),
equalTo("t1 close exception; it means all tasks belonging to this thread should be migrated.")
equalTo("t2 close exception; it means all tasks belonging to this thread should be migrated.")
);
}
@ -3617,8 +3697,7 @@ public class TaskManagerTest {
() -> taskManager.handleAssignment(emptyMap(), emptyMap())
);
// Fatal exception thrown first.
assertThat(thrown.getMessage(), equalTo("Unexpected failure to close 2 task(s) [[0_1, 0_2]]. " +
"First unexpected exception (for task 0_2) follows."));
assertThat(thrown.getMessage(), equalTo("Encounter unexpected fatal error for task 0_2"));
assertThat(thrown.getCause().getMessage(), equalTo("t2 illegal state exception"));
}