KAFKA-10199: Shutdown with new remove operation in state updater (#15894)

Uses the new remove operation of the state updater that returns
a future to shutdown the task manager.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Bruno Cadonna 2024-05-13 14:33:58 +02:00 committed by GitHub
parent 6161fd0db2
commit 5439914c32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 183 additions and 239 deletions

View File

@ -149,14 +149,23 @@ public class DefaultStateUpdater implements StateUpdater {
} catch (final RuntimeException anyOtherException) {
handleRuntimeException(anyOtherException);
} finally {
removeAddedTasksFromInputQueue();
removeUpdatingAndPausedTasks();
clearInputQueue();
clearUpdatingAndPausedTasks();
updaterMetrics.clear();
shutdownGate.countDown();
log.info("State updater thread stopped");
}
}
private void clearInputQueue() {
tasksAndActionsLock.lock();
try {
tasksAndActions.clear();
} finally {
tasksAndActionsLock.unlock();
}
}
// In each iteration:
// 1) check if updating tasks need to be paused
// 2) check if paused tasks need to be resumed
@ -444,17 +453,10 @@ public class DefaultStateUpdater implements StateUpdater {
!isTopologyResumed.get();
}
private void removeUpdatingAndPausedTasks() {
changelogReader.clear();
measureCheckpointLatency(() -> updatingTasks.forEach((id, task) -> {
task.maybeCheckpoint(true);
removedTasks.add(task);
}));
private void clearUpdatingAndPausedTasks() {
updatingTasks.clear();
pausedTasks.forEach((id, task) -> {
removedTasks.add(task);
});
pausedTasks.clear();
changelogReader.clear();
}
private List<TaskAndAction> getTasksAndActions() {
@ -506,16 +508,14 @@ public class DefaultStateUpdater implements StateUpdater {
private void removeTask(final TaskId taskId, final CompletableFuture<RemovedTaskResult> future) {
try {
if (!removeUpdatingTask(taskId, future)) {
if (!removePausedTask(taskId, future)) {
if (!removeRestoredTask(taskId, future)) {
if (!removeFailedTask(taskId, future)) {
if (!removeUpdatingTask(taskId, future)
&& !removePausedTask(taskId, future)
&& !removeRestoredTask(taskId, future)
&& !removeFailedTask(taskId, future)) {
future.complete(null);
log.warn("Task " + taskId + " could not be removed from the state updater because "
+ "the state updater does not own this task.");
}
}
}
log.warn("Task {} could not be removed from the state updater because the state updater does not"
+ " own this task.", taskId);
}
} catch (final StreamsException streamsException) {
handleStreamsException(streamsException);
@ -787,9 +787,12 @@ public class DefaultStateUpdater implements StateUpdater {
this.log = logContext.logger(DefaultStateUpdater.class);
}
@Override
public void start() {
if (stateUpdaterThread == null) {
if (!restoredActiveTasks.isEmpty() || !exceptionsAndFailedTasks.isEmpty()) {
throw new IllegalStateException("State updater started with non-empty output queues. "
+ BUG_ERROR_MESSAGE);
}
stateUpdaterThread = new StateUpdaterThread(name, metrics, changelogReader);
stateUpdaterThread.start();
shutdownGate = new CountDownLatch(1);
@ -823,23 +826,6 @@ public class DefaultStateUpdater implements StateUpdater {
stateUpdaterThread = null;
} catch (final InterruptedException ignored) {
}
} else {
removeAddedTasksFromInputQueue();
}
}
private void removeAddedTasksFromInputQueue() {
tasksAndActionsLock.lock();
try {
TaskAndAction taskAndAction;
while ((taskAndAction = tasksAndActions.peek()) != null) {
if (taskAndAction.action() == Action.ADD) {
removedTasks.add(taskAndAction.task());
}
tasksAndActions.poll();
}
} finally {
tasksAndActionsLock.unlock();
}
}

View File

@ -225,6 +225,7 @@ public interface StateUpdater {
* <li>{@link StateUpdater#drainRestoredActiveTasks(Duration)}</li>
* <li>{@link StateUpdater#drainRemovedTasks()}</li>
* <li>{@link StateUpdater#drainExceptionsAndFailedTasks()}</li>
* <li>{@link StateUpdater#removeWithFuture(org.apache.kafka.streams.processor.TaskId)}</li>
* </ul>
*
* @return set of all tasks managed by the state updater
@ -251,6 +252,7 @@ public interface StateUpdater {
* <li>{@link StateUpdater#drainRestoredActiveTasks(Duration)}</li>
* <li>{@link StateUpdater#drainRemovedTasks()}</li>
* <li>{@link StateUpdater#drainExceptionsAndFailedTasks()}</li>
* <li>{@link StateUpdater#removeWithFuture(org.apache.kafka.streams.processor.TaskId)}</li>
* </ul>
*
* @return {@code true} if the state updater restores active tasks, {@code false} otherwise

View File

@ -947,8 +947,7 @@ public class TaskManager {
}
}
/** Returns true if the task closed clean */
private boolean closeTaskClean(final Task task,
private void closeTaskClean(final Task task,
final Set<Task> tasksToCloseDirty,
final Map<TaskId, RuntimeException> taskExceptions) {
try {
@ -957,7 +956,6 @@ public class TaskManager {
if (task.isActive()) {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
}
return true;
} catch (final RuntimeException e) {
final String uncleanMessage = String.format("Failed to close task %s cleanly. " +
"Attempting to close remaining tasks before re-throwing:", task.id());
@ -968,7 +966,6 @@ public class TaskManager {
}
taskExceptions.putIfAbsent(task.id(), e);
return false;
}
}
@ -1037,44 +1034,6 @@ public class TaskManager {
return taskExceptions;
}
private void handleRemovedTasksFromStateUpdater() {
final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();
final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
for (final Task task : stateUpdater.drainRemovedTasks()) {
Set<TopicPartition> inputPartitions;
if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
recycleTaskFromStateUpdater(task, inputPartitions, tasksToCloseDirty, taskExceptions);
} else if (tasks.removePendingTaskToAddBack(task.id())) {
stateUpdater.add(task);
} else if (tasks.removePendingTaskToCloseClean(task.id())) {
closeTaskClean(task, tasksToCloseDirty, taskExceptions);
} else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) {
if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) {
task.revive();
task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
addTaskToStateUpdater(task);
}
} else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
stateUpdater.add(task);
} else if (tasks.removePendingActiveTaskToSuspend(task.id())) {
task.suspend();
tasks.addTask(task);
} else {
throw new IllegalStateException("Got a removed task " + task.id() + " from the state updater " +
"that is not for recycle, closing, or updating input partitions; this should not happen");
}
}
// for tasks that cannot be cleanly closed or recycled, close them dirty
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task, false);
}
maybeThrowTaskExceptions(taskExceptions);
}
private void handleRestoredTasksFromStateUpdater(final long now,
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
final Duration timeout = Duration.ZERO;
@ -1523,10 +1482,26 @@ public class TaskManager {
private void shutdownStateUpdater() {
if (stateUpdater != null) {
final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>();
for (final Task task : stateUpdater.getTasks()) {
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
futures.put(task.id(), future);
}
final Set<Task> tasksToCloseClean = new HashSet<>();
final Set<Task> tasksToCloseDirty = new HashSet<>();
addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
closeFailedTasksFromStateUpdater();
addRestoredTasksToTaskRegistry();
addRemovedTasksToTaskRegistry();
for (final Task task : tasksToCloseClean) {
tasks.addTask(task);
}
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task, false);
}
for (final StateUpdater.ExceptionAndTask exceptionAndTask : stateUpdater.drainExceptionsAndFailedTasks()) {
final Task failedTask = exceptionAndTask.task();
closeTaskDirty(failedTask, false);
}
}
}
@ -1536,59 +1511,6 @@ public class TaskManager {
}
}
private void closeFailedTasksFromStateUpdater() {
final Set<Task> tasksToCloseDirty = stateUpdater.drainExceptionsAndFailedTasks().stream()
.map(StateUpdater.ExceptionAndTask::task).collect(Collectors.toSet());
for (final Task task : tasksToCloseDirty) {
try {
// we call this function only to flush the case if necessary
// before suspending and closing the topology
task.prepareCommit();
} catch (final RuntimeException swallow) {
log.error("Error flushing caches of dirty task {} ", task.id(), swallow);
}
try {
task.suspend();
} catch (final RuntimeException swallow) {
log.error("Error suspending dirty task {}: {}", task.id(), swallow.getMessage());
}
task.closeDirty();
try {
if (task.isActive()) {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
}
} catch (final RuntimeException swallow) {
log.error("Error closing dirty task {}: {}", task.id(), swallow.getMessage());
}
}
}
private void addRestoredTasksToTaskRegistry() {
tasks.addActiveTasks(stateUpdater.drainRestoredActiveTasks(Duration.ZERO).stream()
.map(t -> (Task) t)
.collect(Collectors.toSet())
);
}
private void addRemovedTasksToTaskRegistry() {
final Set<Task> removedTasks = stateUpdater.drainRemovedTasks();
final Set<Task> removedActiveTasks = new HashSet<>();
final Iterator<Task> iterator = removedTasks.iterator();
while (iterator.hasNext()) {
final Task task = iterator.next();
if (task.isActive()) {
iterator.remove();
removedActiveTasks.add(task);
}
}
tasks.addActiveTasks(removedActiveTasks);
tasks.addStandbyTasks(removedTasks);
}
/**
* Closes and cleans up after the provided tasks, including closing their corresponding task producers
*/

View File

@ -547,9 +547,6 @@ public class RestoreIntegrationTest {
streams1.close();
}
waitForTransitionTo(transitionedStates1, State.NOT_RUNNING, Duration.ofSeconds(60));
if (stateUpdaterEnabled) {
assertThat(standbyUpdateListener.promotedPartitions.size(), CoreMatchers.equalTo(1));
}
assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(initialStoreCloseCount + 4));
}

View File

@ -128,11 +128,55 @@ class DefaultStateUpdaterTest {
}
@Test
public void shouldShutdownStateUpdater() {
public void shouldShutdownStateUpdater() throws Exception {
final StreamTask statelessTask = statelessTask(TASK_0_0).inState(State.RESTORING).build();
final StreamTask restoredStatefulTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamTask failedStatefulTask = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_B_0));
final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(TASK_1_1));
doThrow(taskCorruptedException).when(changelogReader).restore(mkMap(
mkEntry(TASK_1_1, failedStatefulTask),
mkEntry(TASK_0_2, standbyTask)
));
stateUpdater.add(statelessTask);
stateUpdater.add(restoredStatefulTask);
stateUpdater.add(failedStatefulTask);
stateUpdater.add(standbyTask);
stateUpdater.start();
verifyRestoredActiveTasks(statelessTask, restoredStatefulTask);
verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, failedStatefulTask));
verifyUpdatingTasks(standbyTask);
verifyPausedTasks();
stateUpdater.shutdown(Duration.ofMinutes(1));
verifyRestoredActiveTasks(statelessTask, restoredStatefulTask);
verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, failedStatefulTask));
verifyUpdatingTasks();
verifyPausedTasks();
verify(changelogReader).clear();
}
@Test
public void shouldShutdownStateUpdaterWithPausedTasks() throws Exception {
final StreamTask statefulTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
when(topologyMetadata.isPaused(null)).thenReturn(true);
stateUpdater.add(statefulTask);
stateUpdater.add(standbyTask);
stateUpdater.start();
verifyRestoredActiveTasks();
verifyExceptionsAndFailedTasks();
verifyUpdatingTasks();
verifyPausedTasks(statefulTask, standbyTask);
stateUpdater.shutdown(Duration.ofMinutes(1));
verifyRestoredActiveTasks();
verifyExceptionsAndFailedTasks();
verifyUpdatingTasks();
verifyPausedTasks();
verify(changelogReader).clear();
}
@ -150,76 +194,37 @@ class DefaultStateUpdaterTest {
}
@Test
public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws Exception {
final StreamTask statelessTask = statelessTask(TASK_0_0).inState(State.RESTORING).build();
final StreamTask statefulTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
stateUpdater.add(statelessTask);
stateUpdater.add(statefulTask);
stateUpdater.removeWithFuture(TASK_1_1);
stateUpdater.add(standbyTask);
verifyRemovedTasks();
stateUpdater.shutdown(Duration.ofMinutes(1));
verifyRemovedTasks(statelessTask, statefulTask, standbyTask);
}
@Test
public void shouldRemoveUpdatingTasksOnShutdown() throws Exception {
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, new StreamsConfig(configProps(Integer.MAX_VALUE)), null, changelogReader, topologyMetadata, time);
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
public void shouldThrowIfRestartedWithNonEmptyRestoredTasks() throws Exception {
final StreamTask restoredTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_A_1));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask);
verifyUpdatingTasks(activeTask, standbyTask);
verifyRemovedTasks();
stateUpdater.add(restoredTask);
verifyRestoredActiveTasks(restoredTask);
stateUpdater.shutdown(Duration.ofMinutes(1));
verifyRemovedTasks(activeTask, standbyTask);
verify(activeTask).maybeCheckpoint(true);
verify(standbyTask).maybeCheckpoint(true);
final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> stateUpdater.start());
assertEquals("State updater started with non-empty output queues."
+ " This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the"
+ " dev-mailing list (https://kafka.apache.org/contact).", exception.getMessage());
}
@Test
public void shouldRemovePausedTasksOnShutdown() throws Exception {
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
public void shouldThrowIfRestartedWithNonEmptyFailedTasks() throws Exception {
final StreamTask failedTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(TASK_0_0));
doThrow(taskCorruptedException).when(changelogReader).restore(mkMap(mkEntry(TASK_0_0, failedTask)));
stateUpdater.start();
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask);
verifyUpdatingTasks(activeTask, standbyTask);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(activeTask, standbyTask);
verifyRemovedTasks();
stateUpdater.add(failedTask);
verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, failedTask));
stateUpdater.shutdown(Duration.ofMinutes(1));
verifyRemovedTasks(activeTask, standbyTask);
}
final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> stateUpdater.start());
@Test
public void shouldRemovePausedAndUpdatingTasksOnShutdown() throws Exception {
final StreamTask activeTask = statefulTask(TASK_A_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_B_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
when(topologyMetadata.isPaused(standbyTask.id().topologyName())).thenReturn(false).thenReturn(true);
stateUpdater.start();
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask);
verifyPausedTasks(standbyTask);
verifyUpdatingTasks(activeTask);
verifyRemovedTasks();
stateUpdater.shutdown(Duration.ofMinutes(1));
verifyRemovedTasks(activeTask, standbyTask);
assertEquals("State updater started with non-empty output queues."
+ " This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the"
+ " dev-mailing list (https://kafka.apache.org/contact).", exception.getMessage());
}
@Test

View File

@ -176,7 +176,9 @@ public class TaskManagerTest {
private final TaskId taskId05 = new TaskId(0, 5);
private final TopicPartition t1p5 = new TopicPartition(topic1, 5);
private final TopicPartition t1p5changelog = new TopicPartition("changelog", 5);
private final Set<TopicPartition> taskId05Partitions = mkSet(t1p5);
private final Set<TopicPartition> taskId05ChangelogPartitions = mkSet(t1p5changelog);
private final TaskId taskId10 = new TaskId(1, 0);
private final TopicPartition t2p0 = new TopicPartition(topic2, 0);
@ -691,7 +693,7 @@ public class TaskManagerTest {
Collections.emptyMap()
);
verify(stateUpdater, never()).remove(reassignedActiveTask.id());
verify(stateUpdater, never()).removeWithFuture(reassignedActiveTask.id());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@ -730,7 +732,7 @@ public class TaskManagerTest {
mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), taskId03Partitions))
);
verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
verify(stateUpdater, never()).removeWithFuture(standbyTaskToUpdateInputPartitions.id());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@ -1339,7 +1341,7 @@ public class TaskManagerTest {
verify(task1).closeClean();
verify(task3).suspend();
verify(task3).closeClean();
verify(stateUpdater, never()).remove(task2.id());
verify(stateUpdater, never()).removeWithFuture(task2.id());
}
@Test
@ -3381,50 +3383,80 @@ public class TaskManagerTest {
}
@Test
public void shouldShutDownStateUpdaterAndAddRestoredTasksToTaskRegistry() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING).build();
final StreamTask statefulTask2 = statefulTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RESTORING).build();
final Set<StreamTask> restoredActiveTasks = mkSet(statefulTask1, statefulTask2);
final Set<Task> restoredTasks = restoredActiveTasks.stream().map(t -> (Task) t).collect(Collectors.toSet());
when(stateUpdater.drainRestoredActiveTasks(Duration.ZERO)).thenReturn(restoredActiveTasks);
when(tasks.activeTasks()).thenReturn(restoredTasks);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.shutdown(true);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
verify(activeTaskCreator).closeThreadProducerIfNeeded();
verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
verify(tasks).addActiveTasks(restoredTasks);
verify(statefulTask1).closeClean();
verify(statefulTask2).closeClean();
}
@Test
public void shouldShutDownStateUpdaterAndAddRemovedTasksToTaskRegistry() {
public void shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final StreamTask removedStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING).build();
final StandbyTask removedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING).build();
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(removedStandbyTask, removedStatefulTask));
when(tasks.activeTasks()).thenReturn(mkSet(removedStatefulTask));
when(tasks.allTasks()).thenReturn(mkSet(removedStatefulTask, removedStandbyTask));
final StreamTask removedFailedStatefulTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING).build();
final StandbyTask removedFailedStandbyTask = standbyTask(taskId04, taskId04ChangelogPartitions)
.inState(State.RUNNING).build();
final StreamTask removedFailedStatefulTaskDuringRemoval = statefulTask(taskId05, taskId05ChangelogPartitions)
.inState(State.RESTORING).build();
final StandbyTask removedFailedStandbyTaskDuringRemoval = standbyTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING).build();
when(stateUpdater.getTasks())
.thenReturn(mkSet(
removedStatefulTask,
removedStandbyTask,
removedFailedStatefulTask,
removedFailedStandbyTask,
removedFailedStatefulTaskDuringRemoval,
removedFailedStandbyTaskDuringRemoval
));
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedStatefulTask = new CompletableFuture<>();
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedStandbyTask = new CompletableFuture<>();
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedFailedStatefulTask = new CompletableFuture<>();
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedFailedStandbyTask = new CompletableFuture<>();
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedFailedStatefulTaskDuringRemoval = new CompletableFuture<>();
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedFailedStandbyTaskDuringRemoval = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(removedStatefulTask.id())).thenReturn(futureForRemovedStatefulTask);
when(stateUpdater.removeWithFuture(removedStandbyTask.id())).thenReturn(futureForRemovedStandbyTask);
when(stateUpdater.removeWithFuture(removedFailedStatefulTask.id())).thenReturn(futureForRemovedFailedStatefulTask);
when(stateUpdater.removeWithFuture(removedFailedStandbyTask.id())).thenReturn(futureForRemovedFailedStandbyTask);
when(stateUpdater.removeWithFuture(removedFailedStatefulTaskDuringRemoval.id()))
.thenReturn(futureForRemovedFailedStatefulTaskDuringRemoval);
when(stateUpdater.removeWithFuture(removedFailedStandbyTaskDuringRemoval.id()))
.thenReturn(futureForRemovedFailedStandbyTaskDuringRemoval);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(
new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStatefulTaskDuringRemoval),
new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStandbyTaskDuringRemoval)
));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
futureForRemovedStatefulTask.complete(new StateUpdater.RemovedTaskResult(removedStatefulTask));
futureForRemovedStandbyTask.complete(new StateUpdater.RemovedTaskResult(removedStandbyTask));
futureForRemovedFailedStatefulTask
.complete(new StateUpdater.RemovedTaskResult(removedFailedStatefulTask, new StreamsException("KABOOM!")));
futureForRemovedFailedStandbyTask
.complete(new StateUpdater.RemovedTaskResult(removedFailedStandbyTask, new StreamsException("KABOOM!")));
futureForRemovedFailedStatefulTaskDuringRemoval
.completeExceptionally(new StreamsException("KABOOM!"));
futureForRemovedFailedStandbyTaskDuringRemoval
.completeExceptionally(new StreamsException("KABOOM!"));
taskManager.shutdown(true);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
verify(activeTaskCreator).closeThreadProducerIfNeeded();
verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
verify(tasks).addActiveTasks(mkSet(removedStatefulTask));
verify(tasks).addStandbyTasks(mkSet(removedStandbyTask));
verify(removedStatefulTask).closeClean();
verify(removedStandbyTask).closeClean();
verify(tasks).addTask(removedStatefulTask);
verify(tasks).addTask(removedStandbyTask);
verify(removedFailedStatefulTask).prepareCommit();
verify(removedFailedStatefulTask).suspend();
verify(removedFailedStatefulTask).closeDirty();
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId03);
verify(removedFailedStandbyTask).prepareCommit();
verify(removedFailedStandbyTask).suspend();
verify(removedFailedStandbyTask).closeDirty();
verify(activeTaskCreator, never()).closeAndRemoveTaskProducerIfNeeded(taskId04);
verify(removedFailedStatefulTaskDuringRemoval).prepareCommit();
verify(removedFailedStatefulTaskDuringRemoval).suspend();
verify(removedFailedStatefulTaskDuringRemoval).closeDirty();
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId05);
verify(removedFailedStandbyTaskDuringRemoval).prepareCommit();
verify(removedFailedStandbyTaskDuringRemoval).suspend();
verify(removedFailedStandbyTaskDuringRemoval).closeDirty();
verify(activeTaskCreator, never()).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test