From 0349f2310c0b8bc94cda673ac66dba1012d2dba6 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Wed, 10 Jan 2024 10:21:38 +0100 Subject: [PATCH] KAFKA-16097: Add suspended tasks back to the state updater when reassigned (#15163) When a partition is revoked, the corresponding task gets a pending action "SUSPEND". This pending action may overwrite a previous pending action. If the task was previously removed from the state updater, e.g. because we were fenced, the pending action is overwritten with suspend, and in handleAssigned, upon reassignment of that task, then SUSPEND action is removed. Then, once the state updater executes the removal, no pending action is registered anymore, and we run into an IllegalStateException. This commit solves the problem by adding back reassigned tasks to the state updater, since they may have been removed from the state updater for another reason than being restored completely. Reviewers: Bruno Cadonna --- .../internals/PendingUpdateAction.java | 8 ++- .../processor/internals/TaskManager.java | 10 ++- .../streams/processor/internals/Tasks.java | 70 ++++++++++--------- .../processor/internals/TaskManagerTest.java | 21 ++++++ 4 files changed, 75 insertions(+), 34 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java index 382f572351f..b570b141e40 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java @@ -82,5 +82,11 @@ public class PendingUpdateAction { return action; } - + @Override + public String toString() { + return "PendingUpdateAction{" + + "inputPartitions=" + inputPartitions + + ", action=" + action + + '}'; + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index d8e3f41c20b..19c5100a6c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -550,7 +550,15 @@ public class TaskManager { stateUpdater.remove(taskId); } } else if (task.isActive()) { - tasks.removePendingActiveTaskToSuspend(taskId); + if (tasks.removePendingActiveTaskToSuspend(taskId)) { + log.info( + "We were planning on suspending a task {} because it was revoked " + + "The task got reassigned to this thread, so we cancel suspending " + + "of the task, but add it back to the state updater, since we do not know " + + "if it is fully restored yet.", + taskId); + tasks.addPendingTaskToAddBack(taskId); + } if (tasks.removePendingTaskToCloseClean(taskId)) { log.info( "We were planning on closing task {} because we lost one of its partitions." + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index baeaeb70c5c..c9a9a4b0482 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -104,15 +104,12 @@ class Tasks implements TasksRegistry { @Override public Set removePendingTaskToRecycle(final TaskId taskId) { - if (containsTaskIdWithAction(taskId, Action.RECYCLE)) { - return pendingUpdateActions.remove(taskId).getInputPartitions(); - } - return null; + return removePendingUpdateActionWithInputPartitions(taskId, Action.RECYCLE); } @Override public void addPendingTaskToRecycle(final TaskId taskId, final Set inputPartitions) { - pendingUpdateActions.put(taskId, PendingUpdateAction.createRecycleTask(inputPartitions)); + updatePendingUpdateAction(taskId, PendingUpdateAction.createRecycleTask(inputPartitions)); } @Override @@ -122,70 +119,79 @@ class Tasks implements TasksRegistry { @Override public Set removePendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId) { - if (containsTaskIdWithAction(taskId, Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS)) { - return pendingUpdateActions.remove(taskId).getInputPartitions(); - } - return null; + return removePendingUpdateActionWithInputPartitions(taskId, Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS); } @Override public void addPendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId, final Set inputPartitions) { - pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseReviveAndUpdateInputPartition(inputPartitions)); + updatePendingUpdateAction(taskId, PendingUpdateAction.createCloseReviveAndUpdateInputPartition(inputPartitions)); } @Override public Set removePendingTaskToUpdateInputPartitions(final TaskId taskId) { - if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) { - return pendingUpdateActions.remove(taskId).getInputPartitions(); - } - return null; + return removePendingUpdateActionWithInputPartitions(taskId, Action.UPDATE_INPUT_PARTITIONS); } @Override public void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set inputPartitions) { - pendingUpdateActions.put(taskId, PendingUpdateAction.createUpdateInputPartition(inputPartitions)); + updatePendingUpdateAction(taskId, PendingUpdateAction.createUpdateInputPartition(inputPartitions)); } @Override public boolean removePendingTaskToAddBack(final TaskId taskId) { - if (containsTaskIdWithAction(taskId, Action.ADD_BACK)) { - pendingUpdateActions.remove(taskId); - return true; - } - return false; + return removePendingUpdateAction(taskId, Action.ADD_BACK); } @Override public void addPendingTaskToAddBack(final TaskId taskId) { - pendingUpdateActions.put(taskId, PendingUpdateAction.createAddBack()); + updatePendingUpdateAction(taskId, PendingUpdateAction.createAddBack()); } @Override public boolean removePendingTaskToCloseClean(final TaskId taskId) { - if (containsTaskIdWithAction(taskId, Action.CLOSE_CLEAN)) { - pendingUpdateActions.remove(taskId); - return true; - } - return false; + return removePendingUpdateAction(taskId, Action.CLOSE_CLEAN); } @Override public void addPendingTaskToCloseClean(final TaskId taskId) { - pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseClean()); + updatePendingUpdateAction(taskId, PendingUpdateAction.createCloseClean()); } @Override public boolean removePendingActiveTaskToSuspend(final TaskId taskId) { - if (containsTaskIdWithAction(taskId, Action.SUSPEND)) { - pendingUpdateActions.remove(taskId); + return removePendingUpdateAction(taskId, Action.SUSPEND); + } + + @Override + public void addPendingActiveTaskToSuspend(final TaskId taskId) { + updatePendingUpdateAction(taskId, PendingUpdateAction.createSuspend()); + } + + private Set removePendingUpdateActionWithInputPartitions(final TaskId taskId, final Action action) { + if (containsTaskIdWithAction(taskId, action)) { + final PendingUpdateAction pendingUpdateAction = pendingUpdateActions.remove(taskId); + log.info("Removing pending update action {} for task {}", taskId, pendingUpdateAction); + return pendingUpdateAction.getInputPartitions(); + } + return null; + } + + private boolean removePendingUpdateAction(final TaskId taskId, final Action action) { + if (containsTaskIdWithAction(taskId, action)) { + log.info("Removing pending update action {} for task {}", taskId, pendingUpdateActions.remove(taskId)); return true; } return false; } - @Override - public void addPendingActiveTaskToSuspend(final TaskId taskId) { - pendingUpdateActions.put(taskId, PendingUpdateAction.createSuspend()); + private void updatePendingUpdateAction(final TaskId taskId, final PendingUpdateAction newAction) { + if (pendingUpdateActions.containsKey(taskId)) { + log.info("Adding pending update action {} for task {}, previous action was {}", + newAction, taskId, pendingUpdateActions.get(taskId)); + } else { + log.info("Adding pending update action {} for task {}, no previous action", newAction, taskId); + } + pendingUpdateActions.put(taskId, newAction); } private boolean containsTaskIdWithAction(final TaskId taskId, final Action action) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 61e078868bf..1f2fa5cc649 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -631,6 +631,27 @@ public class TaskManagerTest { Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } + @Test + public void shouldRemoveReassignedTaskInStateUpdaterFromPendingSuspend() { + final StreamTask reassignedTask = statefulTask(taskId03, taskId03ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId03Partitions).build(); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedTask)); + when(tasks.removePendingActiveTaskToSuspend(reassignedTask.id())).thenReturn(true); + + taskManager.handleAssignment( + mkMap(mkEntry(reassignedTask.id(), reassignedTask.inputPartitions())), + Collections.emptyMap() + ); + + Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); + Mockito.verify(tasks).removePendingActiveTaskToSuspend(reassignedTask.id()); + Mockito.verify(tasks).addPendingTaskToAddBack(reassignedTask.id()); + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); + } + @Test public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() { final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions)