mirror of https://github.com/apache/kafka.git
KAFKA-16098: Verify pending recycle action when standby is re-assigned (#15168)
When a standby is recycled to an active and then re-assigned as a standby again, it might happen that the recycling is still pending when the standby is reassigned. That causes an illegal state exception from the main consumer since the active task that results from the recycling is actually not assigned to the main consumer anymore, but it was re-assigned as a standby in the most recent rebalance. Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
a63f76970a
commit
fbbfafe1f5
|
@ -574,6 +574,15 @@ public class TaskManager {
|
|||
} else if (standbyTasksToCreate.containsKey(taskId)) {
|
||||
if (task.isActive()) {
|
||||
removeTaskToRecycleFromStateUpdater(taskId, standbyTasksToCreate.get(taskId));
|
||||
} else {
|
||||
if (tasks.removePendingTaskToRecycle(taskId) != null) {
|
||||
log.info(
|
||||
"We were planning on recycling standby task {} to an active task." +
|
||||
"The task got reassigned to this thread as a standby task, so cancel recycling of the task, " +
|
||||
"but add it back to the state updater, since we may have to catch up on the changelog.",
|
||||
taskId);
|
||||
tasks.addPendingTaskToAddBack(taskId);
|
||||
}
|
||||
}
|
||||
standbyTasksToCreate.remove(taskId);
|
||||
} else {
|
||||
|
|
|
@ -652,6 +652,27 @@ public class TaskManagerTest {
|
|||
Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReAddStandbyTaskFromPendingRecycle() {
|
||||
final StandbyTask reassignedStandbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
|
||||
.inState(State.RUNNING)
|
||||
.withInputPartitions(taskId01Partitions).build();
|
||||
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
|
||||
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
||||
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedStandbyTask));
|
||||
when(tasks.removePendingTaskToRecycle(reassignedStandbyTask.id())).thenReturn(taskId01Partitions);
|
||||
|
||||
taskManager.handleAssignment(
|
||||
Collections.emptyMap(),
|
||||
mkMap(mkEntry(reassignedStandbyTask.id(), reassignedStandbyTask.inputPartitions()))
|
||||
);
|
||||
|
||||
Mockito.verify(tasks).removePendingTaskToRecycle(reassignedStandbyTask.id());
|
||||
Mockito.verify(tasks).addPendingTaskToAddBack(reassignedStandbyTask.id());
|
||||
Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
|
||||
Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() {
|
||||
final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions)
|
||||
|
|
Loading…
Reference in New Issue