From fbbfafe1f556f424bf511697db6f399e5a622aa3 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 10 Jan 2024 17:59:06 +0100 Subject: [PATCH] KAFKA-16098: Verify pending recycle action when standby is re-assigned (#15168) When a standby is recycled to an active and then re-assigned as a standby again, it might happen that the recycling is still pending when the standby is reassigned. That causes an illegal state exception from the main consumer since the active task that results from the recycling is actually not assigned to the main consumer anymore, but it was re-assigned as a standby in the most recent rebalance. Reviewer: Lucas Brutschy --- .../processor/internals/TaskManager.java | 9 ++++++++ .../processor/internals/TaskManagerTest.java | 21 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 19c5100a6c8..4c7f3a8eabb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -574,6 +574,15 @@ public class TaskManager { } else if (standbyTasksToCreate.containsKey(taskId)) { if (task.isActive()) { removeTaskToRecycleFromStateUpdater(taskId, standbyTasksToCreate.get(taskId)); + } else { + if (tasks.removePendingTaskToRecycle(taskId) != null) { + log.info( + "We were planning on recycling standby task {} to an active task." + + "The task got reassigned to this thread as a standby task, so cancel recycling of the task, " + + "but add it back to the state updater, since we may have to catch up on the changelog.", + taskId); + tasks.addPendingTaskToAddBack(taskId); + } } standbyTasksToCreate.remove(taskId); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 31ecf4c836b..ba1c91e7f71 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -652,6 +652,27 @@ public class TaskManagerTest { Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } + @Test + public void shouldReAddStandbyTaskFromPendingRecycle() { + final StandbyTask reassignedStandbyTask = standbyTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions).build(); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedStandbyTask)); + when(tasks.removePendingTaskToRecycle(reassignedStandbyTask.id())).thenReturn(taskId01Partitions); + + taskManager.handleAssignment( + Collections.emptyMap(), + mkMap(mkEntry(reassignedStandbyTask.id(), reassignedStandbyTask.inputPartitions())) + ); + + Mockito.verify(tasks).removePendingTaskToRecycle(reassignedStandbyTask.id()); + Mockito.verify(tasks).addPendingTaskToAddBack(reassignedStandbyTask.id()); + Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); + } + @Test public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() { final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions)