From 5439914c32fa00d634efa7219699f1bc21add839 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Mon, 13 May 2024 14:33:58 +0200 Subject: [PATCH] KAFKA-10199: Shutdown with new remove operation in state updater (#15894) Uses the new remove operation of the state updater that returns a future to shutdown the task manager. Reviewer: Lucas Brutschy --- .../internals/DefaultStateUpdater.java | 64 ++++----- .../processor/internals/StateUpdater.java | 2 + .../processor/internals/TaskManager.java | 122 +++-------------- .../integration/RestoreIntegrationTest.java | 3 - .../internals/DefaultStateUpdaterTest.java | 125 +++++++++--------- .../processor/internals/TaskManagerTest.java | 106 +++++++++------ 6 files changed, 183 insertions(+), 239 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 53ae7e34eb1..ee3501ecf20 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -149,14 +149,23 @@ public class DefaultStateUpdater implements StateUpdater { } catch (final RuntimeException anyOtherException) { handleRuntimeException(anyOtherException); } finally { - removeAddedTasksFromInputQueue(); - removeUpdatingAndPausedTasks(); + clearInputQueue(); + clearUpdatingAndPausedTasks(); updaterMetrics.clear(); shutdownGate.countDown(); log.info("State updater thread stopped"); } } + private void clearInputQueue() { + tasksAndActionsLock.lock(); + try { + tasksAndActions.clear(); + } finally { + tasksAndActionsLock.unlock(); + } + } + // In each iteration: // 1) check if updating tasks need to be paused // 2) check if paused tasks need to be resumed @@ -444,17 +453,10 @@ public class DefaultStateUpdater implements StateUpdater { !isTopologyResumed.get(); } - private void removeUpdatingAndPausedTasks() { - changelogReader.clear(); - measureCheckpointLatency(() -> updatingTasks.forEach((id, task) -> { - task.maybeCheckpoint(true); - removedTasks.add(task); - })); + private void clearUpdatingAndPausedTasks() { updatingTasks.clear(); - pausedTasks.forEach((id, task) -> { - removedTasks.add(task); - }); pausedTasks.clear(); + changelogReader.clear(); } private List getTasksAndActions() { @@ -506,16 +508,14 @@ public class DefaultStateUpdater implements StateUpdater { private void removeTask(final TaskId taskId, final CompletableFuture future) { try { - if (!removeUpdatingTask(taskId, future)) { - if (!removePausedTask(taskId, future)) { - if (!removeRestoredTask(taskId, future)) { - if (!removeFailedTask(taskId, future)) { - future.complete(null); - log.warn("Task " + taskId + " could not be removed from the state updater because " - + "the state updater does not own this task."); - } - } - } + if (!removeUpdatingTask(taskId, future) + && !removePausedTask(taskId, future) + && !removeRestoredTask(taskId, future) + && !removeFailedTask(taskId, future)) { + + future.complete(null); + log.warn("Task {} could not be removed from the state updater because the state updater does not" + + " own this task.", taskId); } } catch (final StreamsException streamsException) { handleStreamsException(streamsException); @@ -787,9 +787,12 @@ public class DefaultStateUpdater implements StateUpdater { this.log = logContext.logger(DefaultStateUpdater.class); } - @Override public void start() { if (stateUpdaterThread == null) { + if (!restoredActiveTasks.isEmpty() || !exceptionsAndFailedTasks.isEmpty()) { + throw new IllegalStateException("State updater started with non-empty output queues. " + + BUG_ERROR_MESSAGE); + } stateUpdaterThread = new StateUpdaterThread(name, metrics, changelogReader); stateUpdaterThread.start(); shutdownGate = new CountDownLatch(1); @@ -823,23 +826,6 @@ public class DefaultStateUpdater implements StateUpdater { stateUpdaterThread = null; } catch (final InterruptedException ignored) { } - } else { - removeAddedTasksFromInputQueue(); - } - } - - private void removeAddedTasksFromInputQueue() { - tasksAndActionsLock.lock(); - try { - TaskAndAction taskAndAction; - while ((taskAndAction = tasksAndActions.peek()) != null) { - if (taskAndAction.action() == Action.ADD) { - removedTasks.add(taskAndAction.task()); - } - tasksAndActions.poll(); - } - } finally { - tasksAndActionsLock.unlock(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java index e39cbb10c65..9b045a55a82 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java @@ -225,6 +225,7 @@ public interface StateUpdater { *
  • {@link StateUpdater#drainRestoredActiveTasks(Duration)}
  • *
  • {@link StateUpdater#drainRemovedTasks()}
  • *
  • {@link StateUpdater#drainExceptionsAndFailedTasks()}
  • + *
  • {@link StateUpdater#removeWithFuture(org.apache.kafka.streams.processor.TaskId)}
  • * * * @return set of all tasks managed by the state updater @@ -251,6 +252,7 @@ public interface StateUpdater { *
  • {@link StateUpdater#drainRestoredActiveTasks(Duration)}
  • *
  • {@link StateUpdater#drainRemovedTasks()}
  • *
  • {@link StateUpdater#drainExceptionsAndFailedTasks()}
  • + *
  • {@link StateUpdater#removeWithFuture(org.apache.kafka.streams.processor.TaskId)}
  • * * * @return {@code true} if the state updater restores active tasks, {@code false} otherwise diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index d01a8c714fc..b6677c48e68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -947,17 +947,15 @@ public class TaskManager { } } - /** Returns true if the task closed clean */ - private boolean closeTaskClean(final Task task, - final Set tasksToCloseDirty, - final Map taskExceptions) { + private void closeTaskClean(final Task task, + final Set tasksToCloseDirty, + final Map taskExceptions) { try { task.suspend(); task.closeClean(); if (task.isActive()) { activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); } - return true; } catch (final RuntimeException e) { final String uncleanMessage = String.format("Failed to close task %s cleanly. " + "Attempting to close remaining tasks before re-throwing:", task.id()); @@ -968,7 +966,6 @@ public class TaskManager { } taskExceptions.putIfAbsent(task.id(), e); - return false; } } @@ -1037,44 +1034,6 @@ public class TaskManager { return taskExceptions; } - private void handleRemovedTasksFromStateUpdater() { - final Map taskExceptions = new LinkedHashMap<>(); - final Set tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id)); - - for (final Task task : stateUpdater.drainRemovedTasks()) { - Set inputPartitions; - if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) { - recycleTaskFromStateUpdater(task, inputPartitions, tasksToCloseDirty, taskExceptions); - } else if (tasks.removePendingTaskToAddBack(task.id())) { - stateUpdater.add(task); - } else if (tasks.removePendingTaskToCloseClean(task.id())) { - closeTaskClean(task, tasksToCloseDirty, taskExceptions); - } else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) { - if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) { - task.revive(); - task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); - addTaskToStateUpdater(task); - } - } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { - task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); - stateUpdater.add(task); - } else if (tasks.removePendingActiveTaskToSuspend(task.id())) { - task.suspend(); - tasks.addTask(task); - } else { - throw new IllegalStateException("Got a removed task " + task.id() + " from the state updater " + - "that is not for recycle, closing, or updating input partitions; this should not happen"); - } - } - - // for tasks that cannot be cleanly closed or recycled, close them dirty - for (final Task task : tasksToCloseDirty) { - closeTaskDirty(task, false); - } - - maybeThrowTaskExceptions(taskExceptions); - } - private void handleRestoredTasksFromStateUpdater(final long now, final java.util.function.Consumer> offsetResetter) { final Duration timeout = Duration.ZERO; @@ -1523,10 +1482,26 @@ public class TaskManager { private void shutdownStateUpdater() { if (stateUpdater != null) { + final Map> futures = new LinkedHashMap<>(); + for (final Task task : stateUpdater.getTasks()) { + final CompletableFuture future = stateUpdater.removeWithFuture(task.id()); + futures.put(task.id(), future); + } + final Set tasksToCloseClean = new HashSet<>(); + final Set tasksToCloseDirty = new HashSet<>(); + addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty); stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE)); - closeFailedTasksFromStateUpdater(); - addRestoredTasksToTaskRegistry(); - addRemovedTasksToTaskRegistry(); + + for (final Task task : tasksToCloseClean) { + tasks.addTask(task); + } + for (final Task task : tasksToCloseDirty) { + closeTaskDirty(task, false); + } + for (final StateUpdater.ExceptionAndTask exceptionAndTask : stateUpdater.drainExceptionsAndFailedTasks()) { + final Task failedTask = exceptionAndTask.task(); + closeTaskDirty(failedTask, false); + } } } @@ -1536,59 +1511,6 @@ public class TaskManager { } } - private void closeFailedTasksFromStateUpdater() { - final Set tasksToCloseDirty = stateUpdater.drainExceptionsAndFailedTasks().stream() - .map(StateUpdater.ExceptionAndTask::task).collect(Collectors.toSet()); - - for (final Task task : tasksToCloseDirty) { - try { - // we call this function only to flush the case if necessary - // before suspending and closing the topology - task.prepareCommit(); - } catch (final RuntimeException swallow) { - log.error("Error flushing caches of dirty task {} ", task.id(), swallow); - } - - try { - task.suspend(); - } catch (final RuntimeException swallow) { - log.error("Error suspending dirty task {}: {}", task.id(), swallow.getMessage()); - } - - task.closeDirty(); - - try { - if (task.isActive()) { - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); - } - } catch (final RuntimeException swallow) { - log.error("Error closing dirty task {}: {}", task.id(), swallow.getMessage()); - } - } - } - - private void addRestoredTasksToTaskRegistry() { - tasks.addActiveTasks(stateUpdater.drainRestoredActiveTasks(Duration.ZERO).stream() - .map(t -> (Task) t) - .collect(Collectors.toSet()) - ); - } - - private void addRemovedTasksToTaskRegistry() { - final Set removedTasks = stateUpdater.drainRemovedTasks(); - final Set removedActiveTasks = new HashSet<>(); - final Iterator iterator = removedTasks.iterator(); - while (iterator.hasNext()) { - final Task task = iterator.next(); - if (task.isActive()) { - iterator.remove(); - removedActiveTasks.add(task); - } - } - tasks.addActiveTasks(removedActiveTasks); - tasks.addStandbyTasks(removedTasks); - } - /** * Closes and cleans up after the provided tasks, including closing their corresponding task producers */ diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index b41e0c3d29f..1db1d2a981e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -547,9 +547,6 @@ public class RestoreIntegrationTest { streams1.close(); } waitForTransitionTo(transitionedStates1, State.NOT_RUNNING, Duration.ofSeconds(60)); - if (stateUpdaterEnabled) { - assertThat(standbyUpdateListener.promotedPartitions.size(), CoreMatchers.equalTo(1)); - } assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(initialStoreCloseCount + 4)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 62a0db6a4d5..01bd06b2db3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -128,11 +128,55 @@ class DefaultStateUpdaterTest { } @Test - public void shouldShutdownStateUpdater() { + public void shouldShutdownStateUpdater() throws Exception { + final StreamTask statelessTask = statelessTask(TASK_0_0).inState(State.RESTORING).build(); + final StreamTask restoredStatefulTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StreamTask failedStatefulTask = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); + when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_B_0)); + final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(TASK_1_1)); + doThrow(taskCorruptedException).when(changelogReader).restore(mkMap( + mkEntry(TASK_1_1, failedStatefulTask), + mkEntry(TASK_0_2, standbyTask) + )); + stateUpdater.add(statelessTask); + stateUpdater.add(restoredStatefulTask); + stateUpdater.add(failedStatefulTask); + stateUpdater.add(standbyTask); stateUpdater.start(); + verifyRestoredActiveTasks(statelessTask, restoredStatefulTask); + verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, failedStatefulTask)); + verifyUpdatingTasks(standbyTask); + verifyPausedTasks(); stateUpdater.shutdown(Duration.ofMinutes(1)); + verifyRestoredActiveTasks(statelessTask, restoredStatefulTask); + verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, failedStatefulTask)); + verifyUpdatingTasks(); + verifyPausedTasks(); + verify(changelogReader).clear(); + } + + @Test + public void shouldShutdownStateUpdaterWithPausedTasks() throws Exception { + final StreamTask statefulTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); + final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); + when(topologyMetadata.isPaused(null)).thenReturn(true); + stateUpdater.add(statefulTask); + stateUpdater.add(standbyTask); + stateUpdater.start(); + verifyRestoredActiveTasks(); + verifyExceptionsAndFailedTasks(); + verifyUpdatingTasks(); + verifyPausedTasks(statefulTask, standbyTask); + + stateUpdater.shutdown(Duration.ofMinutes(1)); + + verifyRestoredActiveTasks(); + verifyExceptionsAndFailedTasks(); + verifyUpdatingTasks(); + verifyPausedTasks(); verify(changelogReader).clear(); } @@ -150,76 +194,37 @@ class DefaultStateUpdaterTest { } @Test - public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws Exception { - final StreamTask statelessTask = statelessTask(TASK_0_0).inState(State.RESTORING).build(); - final StreamTask statefulTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); - final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); - stateUpdater.add(statelessTask); - stateUpdater.add(statefulTask); - stateUpdater.removeWithFuture(TASK_1_1); - stateUpdater.add(standbyTask); - verifyRemovedTasks(); - - stateUpdater.shutdown(Duration.ofMinutes(1)); - - verifyRemovedTasks(statelessTask, statefulTask, standbyTask); - } - - @Test - public void shouldRemoveUpdatingTasksOnShutdown() throws Exception { - stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE)); - stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, new StreamsConfig(configProps(Integer.MAX_VALUE)), null, changelogReader, topologyMetadata, time); - final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); - final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); - when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); + public void shouldThrowIfRestartedWithNonEmptyRestoredTasks() throws Exception { + final StreamTask restoredTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_A_1)); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); - stateUpdater.add(activeTask); - stateUpdater.add(standbyTask); - verifyUpdatingTasks(activeTask, standbyTask); - verifyRemovedTasks(); - + stateUpdater.add(restoredTask); + verifyRestoredActiveTasks(restoredTask); stateUpdater.shutdown(Duration.ofMinutes(1)); - verifyRemovedTasks(activeTask, standbyTask); - verify(activeTask).maybeCheckpoint(true); - verify(standbyTask).maybeCheckpoint(true); + final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> stateUpdater.start()); + + assertEquals("State updater started with non-empty output queues." + + " This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the" + + " dev-mailing list (https://kafka.apache.org/contact).", exception.getMessage()); } @Test - public void shouldRemovePausedTasksOnShutdown() throws Exception { - final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); - final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build(); + public void shouldThrowIfRestartedWithNonEmptyFailedTasks() throws Exception { + final StreamTask failedTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); + final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(TASK_0_0)); + doThrow(taskCorruptedException).when(changelogReader).restore(mkMap(mkEntry(TASK_0_0, failedTask))); stateUpdater.start(); - stateUpdater.add(activeTask); - stateUpdater.add(standbyTask); - verifyUpdatingTasks(activeTask, standbyTask); - when(topologyMetadata.isPaused(null)).thenReturn(true); - verifyPausedTasks(activeTask, standbyTask); - verifyRemovedTasks(); - + stateUpdater.add(failedTask); + verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, failedTask)); stateUpdater.shutdown(Duration.ofMinutes(1)); - verifyRemovedTasks(activeTask, standbyTask); - } + final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> stateUpdater.start()); - @Test - public void shouldRemovePausedAndUpdatingTasksOnShutdown() throws Exception { - final StreamTask activeTask = statefulTask(TASK_A_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); - final StandbyTask standbyTask = standbyTask(TASK_B_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build(); - - when(topologyMetadata.isPaused(standbyTask.id().topologyName())).thenReturn(false).thenReturn(true); - - stateUpdater.start(); - stateUpdater.add(activeTask); - stateUpdater.add(standbyTask); - verifyPausedTasks(standbyTask); - verifyUpdatingTasks(activeTask); - verifyRemovedTasks(); - - stateUpdater.shutdown(Duration.ofMinutes(1)); - - verifyRemovedTasks(activeTask, standbyTask); + assertEquals("State updater started with non-empty output queues." + + " This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the" + + " dev-mailing list (https://kafka.apache.org/contact).", exception.getMessage()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index a95393bcded..e02e579030a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -176,7 +176,9 @@ public class TaskManagerTest { private final TaskId taskId05 = new TaskId(0, 5); private final TopicPartition t1p5 = new TopicPartition(topic1, 5); + private final TopicPartition t1p5changelog = new TopicPartition("changelog", 5); private final Set taskId05Partitions = mkSet(t1p5); + private final Set taskId05ChangelogPartitions = mkSet(t1p5changelog); private final TaskId taskId10 = new TaskId(1, 0); private final TopicPartition t2p0 = new TopicPartition(topic2, 0); @@ -691,7 +693,7 @@ public class TaskManagerTest { Collections.emptyMap() ); - verify(stateUpdater, never()).remove(reassignedActiveTask.id()); + verify(stateUpdater, never()).removeWithFuture(reassignedActiveTask.id()); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } @@ -730,7 +732,7 @@ public class TaskManagerTest { mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), taskId03Partitions)) ); - verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id()); + verify(stateUpdater, never()).removeWithFuture(standbyTaskToUpdateInputPartitions.id()); verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } @@ -1339,7 +1341,7 @@ public class TaskManagerTest { verify(task1).closeClean(); verify(task3).suspend(); verify(task3).closeClean(); - verify(stateUpdater, never()).remove(task2.id()); + verify(stateUpdater, never()).removeWithFuture(task2.id()); } @Test @@ -3381,50 +3383,80 @@ public class TaskManagerTest { } @Test - public void shouldShutDownStateUpdaterAndAddRestoredTasksToTaskRegistry() { - final TasksRegistry tasks = mock(TasksRegistry.class); - final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions) - .inState(State.RESTORING).build(); - final StreamTask statefulTask2 = statefulTask(taskId02, taskId02ChangelogPartitions) - .inState(State.RESTORING).build(); - final Set restoredActiveTasks = mkSet(statefulTask1, statefulTask2); - final Set restoredTasks = restoredActiveTasks.stream().map(t -> (Task) t).collect(Collectors.toSet()); - when(stateUpdater.drainRestoredActiveTasks(Duration.ZERO)).thenReturn(restoredActiveTasks); - when(tasks.activeTasks()).thenReturn(restoredTasks); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); - - taskManager.shutdown(true); - - verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask1.id()); - verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask2.id()); - verify(activeTaskCreator).closeThreadProducerIfNeeded(); - verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE)); - verify(tasks).addActiveTasks(restoredTasks); - verify(statefulTask1).closeClean(); - verify(statefulTask2).closeClean(); - } - - @Test - public void shouldShutDownStateUpdaterAndAddRemovedTasksToTaskRegistry() { + public void shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() { final TasksRegistry tasks = mock(TasksRegistry.class); final StreamTask removedStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions) .inState(State.RESTORING).build(); final StandbyTask removedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) .inState(State.RUNNING).build(); - when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(removedStandbyTask, removedStatefulTask)); - when(tasks.activeTasks()).thenReturn(mkSet(removedStatefulTask)); - when(tasks.allTasks()).thenReturn(mkSet(removedStatefulTask, removedStandbyTask)); + final StreamTask removedFailedStatefulTask = statefulTask(taskId03, taskId03ChangelogPartitions) + .inState(State.RESTORING).build(); + final StandbyTask removedFailedStandbyTask = standbyTask(taskId04, taskId04ChangelogPartitions) + .inState(State.RUNNING).build(); + final StreamTask removedFailedStatefulTaskDuringRemoval = statefulTask(taskId05, taskId05ChangelogPartitions) + .inState(State.RESTORING).build(); + final StandbyTask removedFailedStandbyTaskDuringRemoval = standbyTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING).build(); + when(stateUpdater.getTasks()) + .thenReturn(mkSet( + removedStatefulTask, + removedStandbyTask, + removedFailedStatefulTask, + removedFailedStandbyTask, + removedFailedStatefulTaskDuringRemoval, + removedFailedStandbyTaskDuringRemoval + )); + final CompletableFuture futureForRemovedStatefulTask = new CompletableFuture<>(); + final CompletableFuture futureForRemovedStandbyTask = new CompletableFuture<>(); + final CompletableFuture futureForRemovedFailedStatefulTask = new CompletableFuture<>(); + final CompletableFuture futureForRemovedFailedStandbyTask = new CompletableFuture<>(); + final CompletableFuture futureForRemovedFailedStatefulTaskDuringRemoval = new CompletableFuture<>(); + final CompletableFuture futureForRemovedFailedStandbyTaskDuringRemoval = new CompletableFuture<>(); + when(stateUpdater.removeWithFuture(removedStatefulTask.id())).thenReturn(futureForRemovedStatefulTask); + when(stateUpdater.removeWithFuture(removedStandbyTask.id())).thenReturn(futureForRemovedStandbyTask); + when(stateUpdater.removeWithFuture(removedFailedStatefulTask.id())).thenReturn(futureForRemovedFailedStatefulTask); + when(stateUpdater.removeWithFuture(removedFailedStandbyTask.id())).thenReturn(futureForRemovedFailedStandbyTask); + when(stateUpdater.removeWithFuture(removedFailedStatefulTaskDuringRemoval.id())) + .thenReturn(futureForRemovedFailedStatefulTaskDuringRemoval); + when(stateUpdater.removeWithFuture(removedFailedStandbyTaskDuringRemoval.id())) + .thenReturn(futureForRemovedFailedStandbyTaskDuringRemoval); + when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList( + new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStatefulTaskDuringRemoval), + new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStandbyTaskDuringRemoval) + )); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + futureForRemovedStatefulTask.complete(new StateUpdater.RemovedTaskResult(removedStatefulTask)); + futureForRemovedStandbyTask.complete(new StateUpdater.RemovedTaskResult(removedStandbyTask)); + futureForRemovedFailedStatefulTask + .complete(new StateUpdater.RemovedTaskResult(removedFailedStatefulTask, new StreamsException("KABOOM!"))); + futureForRemovedFailedStandbyTask + .complete(new StateUpdater.RemovedTaskResult(removedFailedStandbyTask, new StreamsException("KABOOM!"))); + futureForRemovedFailedStatefulTaskDuringRemoval + .completeExceptionally(new StreamsException("KABOOM!")); + futureForRemovedFailedStandbyTaskDuringRemoval + .completeExceptionally(new StreamsException("KABOOM!")); taskManager.shutdown(true); - verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id()); - verify(activeTaskCreator).closeThreadProducerIfNeeded(); verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE)); - verify(tasks).addActiveTasks(mkSet(removedStatefulTask)); - verify(tasks).addStandbyTasks(mkSet(removedStandbyTask)); - verify(removedStatefulTask).closeClean(); - verify(removedStandbyTask).closeClean(); + verify(tasks).addTask(removedStatefulTask); + verify(tasks).addTask(removedStandbyTask); + verify(removedFailedStatefulTask).prepareCommit(); + verify(removedFailedStatefulTask).suspend(); + verify(removedFailedStatefulTask).closeDirty(); + verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId03); + verify(removedFailedStandbyTask).prepareCommit(); + verify(removedFailedStandbyTask).suspend(); + verify(removedFailedStandbyTask).closeDirty(); + verify(activeTaskCreator, never()).closeAndRemoveTaskProducerIfNeeded(taskId04); + verify(removedFailedStatefulTaskDuringRemoval).prepareCommit(); + verify(removedFailedStatefulTaskDuringRemoval).suspend(); + verify(removedFailedStatefulTaskDuringRemoval).closeDirty(); + verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId05); + verify(removedFailedStandbyTaskDuringRemoval).prepareCommit(); + verify(removedFailedStandbyTaskDuringRemoval).suspend(); + verify(removedFailedStandbyTaskDuringRemoval).closeDirty(); + verify(activeTaskCreator, never()).closeAndRemoveTaskProducerIfNeeded(taskId00); } @Test