KAFKA-14299: Never transition to UpdateStandby twice (#12762)

In two situations, the current code could transition the ChangelogReader
to UpdateStandby when already in that state, causing an IllegalStateException. 
Namely these two cases are:

1. When only standby tasks are restoring and one of them crashes.
2. When only standby tasks are restoring and one of them is paused.

This change fixes both issues by only transitioning if the paused or
failed task is an active task.

Reviewer: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Lucas Brutschy 2022-10-19 09:29:19 +02:00 committed by GitHub
parent a692223a44
commit 2c8f14c57e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 2 deletions

View File

@ -214,8 +214,10 @@ public class DefaultStateUpdater implements StateUpdater {
private void addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(final ExceptionAndTasks exceptionAndTasks) { private void addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(final ExceptionAndTasks exceptionAndTasks) {
exceptionsAndFailedTasks.add(exceptionAndTasks); exceptionsAndFailedTasks.add(exceptionAndTasks);
exceptionAndTasks.getTasks().stream().map(Task::id).forEach(updatingTasks::remove); exceptionAndTasks.getTasks().stream().map(Task::id).forEach(updatingTasks::remove);
if (exceptionAndTasks.getTasks().stream().anyMatch(Task::isActive)) {
transitToUpdateStandbysIfOnlyStandbysLeft(); transitToUpdateStandbysIfOnlyStandbysLeft();
} }
}
private void addToExceptionsAndFailedTasksThenClearUpdatingTasks(final ExceptionAndTasks exceptionAndTasks) { private void addToExceptionsAndFailedTasksThenClearUpdatingTasks(final ExceptionAndTasks exceptionAndTasks) {
exceptionsAndFailedTasks.add(exceptionAndTasks); exceptionsAndFailedTasks.add(exceptionAndTasks);
@ -310,7 +312,9 @@ public class DefaultStateUpdater implements StateUpdater {
task.maybeCheckpoint(true); task.maybeCheckpoint(true);
pausedTasks.put(taskId, task); pausedTasks.put(taskId, task);
updatingTasks.remove(taskId); updatingTasks.remove(taskId);
if (task.isActive()) {
transitToUpdateStandbysIfOnlyStandbysLeft(); transitToUpdateStandbysIfOnlyStandbysLeft();
}
log.debug((task.isActive() ? "Active" : "Standby") log.debug((task.isActive() ? "Active" : "Standby")
+ " task " + task.id() + " was paused from the updating tasks and added to the paused tasks."); + " task " + task.id() + " was paused from the updating tasks and added to the paused tasks.");
} }

View File

@ -659,6 +659,27 @@ class DefaultStateUpdaterTest {
orderVerifier.verify(changelogReader).transitToUpdateStandby(); orderVerifier.verify(changelogReader).transitToUpdateStandby();
} }
@Test
public void shouldNotTransitToStandbyAgainAfterStandbyTaskFailed() throws Exception {
final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
final StandbyTask task2 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task1.id(), task1),
mkEntry(task2.id(), task2)
);
final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(task1.id()));
final ExceptionAndTasks expectedExceptionAndTasks = new ExceptionAndTasks(mkSet(task1), taskCorruptedException);
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
doThrow(taskCorruptedException).doNothing().when(changelogReader).restore(updatingTasks);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
verify(changelogReader, times(1)).transitToUpdateStandby();
}
@Test @Test
public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception { public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception {
final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
@ -862,6 +883,22 @@ class DefaultStateUpdaterTest {
verify(changelogReader, times(1)).transitToUpdateStandby(); verify(changelogReader, times(1)).transitToUpdateStandby();
} }
@Test
public void shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain() throws Exception {
final StandbyTask task1 = standbyTask(TASK_A_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
final StandbyTask task2 = standbyTask(TASK_B_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
verifyUpdatingTasks(task1, task2);
when(topologyMetadata.isPaused("A")).thenReturn(true);
verifyPausedTasks(task1);
verify(changelogReader, times(1)).transitToUpdateStandby();
}
private void shouldPauseStatefulTask(final Task task) throws Exception { private void shouldPauseStatefulTask(final Task task) throws Exception {
stateUpdater.start(); stateUpdater.start();
stateUpdater.add(task); stateUpdater.add(task);