diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index 92dd07ba974..dcceb5f7848 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -258,6 +258,7 @@ class Tasks implements TasksRegistry { @Override public synchronized void clear() { + pendingTasksToInit.clear(); activeTasksPerId.clear(); standbyTasksPerId.clear(); activeTasksPerPartition.clear(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java index e2bd30c20a5..a62d105bb90 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java @@ -207,4 +207,15 @@ public class TasksTest { tasks.addTask(activeTask1); assertTrue(tasks.allNonFailedTasks().contains(activeTask1)); } + + @Test + public void shouldClearPendingToInitTasks() { + final StandbyTask task = standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_B_0)) + .inState(State.CREATED).build(); + tasks.addPendingTasksToInit(Collections.singleton(task)); + + assertTrue(tasks.pendingTasksToInit().contains(task)); + tasks.clear(); + assertFalse(tasks.pendingTasksToInit().contains(task)); + } }