mirror of https://github.com/apache/kafka.git
KAFKA-17085: Handle tasks in state updater before tasks in task registry (#16555)
When a active tasks are revoked they land as suspended tasks in the task registry. If they are then reassigned, the tasks are resumed and put into restoration. On assignment, we first handle the tasks in the task registry and then the tasks in the state updater. That means that if a task is re-assigned after a revocation, we remove the suspended task from the task registry, resume it, add it to the state updater, and then remove it from the list of tasks to create. After that we iterate over the tasks in the state updater and remove from there the tasks that are not in the list of tasks to create. However, now the state updater contains the resumed tasks that we removed from the task registry before but are no more in the list of tasks to create. In other words, we remove the resumed tasks from the state updater and close them although we just got them assigned. This commit ensures that we first handle the tasks in the state updater and then the tasks in the task registry. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
1dd16c4f2e
commit
4ecbb75c1f
|
@ -507,8 +507,8 @@ public class TaskManager {
|
|||
final Set<Task> tasksToCloseClean,
|
||||
final Map<TaskId, RuntimeException> failedTasks) {
|
||||
handleTasksPendingInitialization();
|
||||
handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
|
||||
handleRestoringAndUpdatingTasks(activeTasksToCreate, standbyTasksToCreate, failedTasks);
|
||||
handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
|
||||
}
|
||||
|
||||
private void handleTasksPendingInitialization() {
|
||||
|
|
|
@ -718,6 +718,35 @@ public class TaskManagerTest {
|
|||
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldFirstHandleTasksInStateUpdaterThenSuspendedActiveTasksInTaskRegistry() {
|
||||
final StreamTask reassignedActiveTask1 = statefulTask(taskId03, taskId03ChangelogPartitions)
|
||||
.inState(State.SUSPENDED)
|
||||
.withInputPartitions(taskId03Partitions).build();
|
||||
final StreamTask reassignedActiveTask2 = statefulTask(taskId02, taskId02ChangelogPartitions)
|
||||
.inState(State.RESTORING)
|
||||
.withInputPartitions(taskId02Partitions).build();
|
||||
final TasksRegistry tasks = mock(TasksRegistry.class);
|
||||
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
||||
when(tasks.allTasks()).thenReturn(mkSet(reassignedActiveTask1));
|
||||
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedActiveTask2));
|
||||
when(stateUpdater.remove(reassignedActiveTask2.id()))
|
||||
.thenReturn(CompletableFuture.completedFuture(new StateUpdater.RemovedTaskResult(reassignedActiveTask2)));
|
||||
|
||||
taskManager.handleAssignment(
|
||||
mkMap(
|
||||
mkEntry(reassignedActiveTask1.id(), reassignedActiveTask1.inputPartitions()),
|
||||
mkEntry(reassignedActiveTask2.id(), taskId00Partitions)
|
||||
),
|
||||
Collections.emptyMap()
|
||||
);
|
||||
|
||||
final InOrder inOrder = inOrder(stateUpdater, tasks);
|
||||
inOrder.verify(stateUpdater).remove(reassignedActiveTask2.id());
|
||||
inOrder.verify(tasks).removeTask(reassignedActiveTask1);
|
||||
inOrder.verify(stateUpdater).add(reassignedActiveTask1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() {
|
||||
final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions)
|
||||
|
|
Loading…
Reference in New Issue