KAFKA-17085: Handle tasks in state updater before tasks in task registry (#16555) (#16561)

When a active tasks are revoked they land as suspended tasks in the
task registry. If they are then reassigned, the tasks are resumed
and put into restoration. On assignment, we first handle the tasks
in the task registry and then the tasks in the state updater. That
means that if a task is re-assigned after a revocation, we remove
the suspended task from the task registry, resume it, add it
to the state updater, and then remove it from the list of tasks
to create. After that we iterate over the tasks in the state
updater and remove from there the tasks that are not in the list
of tasks to create. However, now the state updater contains the
resumed tasks that we removed from the task registry before but
are no more in the list of tasks to create. In other words, we
remove the resumed tasks from the state updater and close them
although we just got them assigned.

This commit ensures that we first handle the tasks in the
state updater and then the tasks in the task registry.

Cherry-pick of 4ecbb75

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Bruno Cadonna 2024-07-11 10:41:55 +02:00 committed by GitHub
parent ede289db93
commit d7fbdcfd83
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 30 additions and 1 deletions

View File

@ -508,8 +508,8 @@ public class TaskManager {
final Set<Task> tasksToCloseClean,
final Map<TaskId, RuntimeException> failedTasks) {
handleTasksPendingInitialization();
handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
handleRestoringAndUpdatingTasks(activeTasksToCreate, standbyTasksToCreate, failedTasks);
handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
}
private void handleTasksPendingInitialization() {

View File

@ -714,6 +714,35 @@ public class TaskManagerTest {
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
public void shouldFirstHandleTasksInStateUpdaterThenSuspendedActiveTasksInTaskRegistry() {
final StreamTask reassignedActiveTask1 = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
final StreamTask reassignedActiveTask2 = statefulTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(reassignedActiveTask1));
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedActiveTask2));
when(stateUpdater.remove(reassignedActiveTask2.id()))
.thenReturn(CompletableFuture.completedFuture(new StateUpdater.RemovedTaskResult(reassignedActiveTask2)));
taskManager.handleAssignment(
mkMap(
mkEntry(reassignedActiveTask1.id(), reassignedActiveTask1.inputPartitions()),
mkEntry(reassignedActiveTask2.id(), taskId00Partitions)
),
Collections.emptyMap()
);
final InOrder inOrder = inOrder(stateUpdater, tasks);
inOrder.verify(stateUpdater).remove(reassignedActiveTask2.id());
inOrder.verify(tasks).removeTask(reassignedActiveTask1);
inOrder.verify(stateUpdater).add(reassignedActiveTask1);
}
@Test
public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() {
final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions)