From cb35ddc5ca233d5cca6f51c1c41b952a7e9fe1a0 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Tue, 7 May 2024 14:26:23 +0200 Subject: [PATCH] KAFKA-10199: Remove lost tasks in state updater with new remove (#15870) Uses the new remove operation of the state updater that returns a future to remove lost tasks from the state udpater. Reviewer: Lucas Brutschy --- .../processor/internals/TaskManager.java | 67 ++++++++++++++++++- .../processor/internals/TaskManagerTest.java | 44 ++++++++++-- 2 files changed, 103 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index ff055b777b4..4b72f648613 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.DeleteRecordsResult; @@ -71,6 +73,11 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMo import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName; public class TaskManager { + + private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " + + "Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact)."; + private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. " + BUG_ERROR_MESSAGE; + // initialize the task list // activeTasks needs to be concurrent as it can be accessed // by QueryableState @@ -602,6 +609,51 @@ public class TaskManager { tasks.addPendingTaskToCloseClean(taskId); } + private void addToTasksToClose(final Map> futures, + final Set tasksToCloseCleanFromStateUpdater, + final Set tasksToCloseDirtyFromStateUpdater) { + iterateAndActOnFuture(futures, removedTaskResult -> { + final Task task = removedTaskResult.task(); + final Optional exception = removedTaskResult.exception(); + if (exception.isPresent()) { + tasksToCloseDirtyFromStateUpdater.add(task); + } else { + tasksToCloseCleanFromStateUpdater.add(task); + } + }); + } + + private void iterateAndActOnFuture(final Map> futures, + final java.util.function.Consumer action) { + for (final Map.Entry> entry : futures.entrySet()) { + final TaskId taskId = entry.getKey(); + final CompletableFuture future = entry.getValue(); + try { + final StateUpdater.RemovedTaskResult removedTaskResult = waitForFuture(taskId, future); + action.accept(removedTaskResult); + } catch (final ExecutionException executionException) { + log.warn("An exception happened when removing task {} from the state updater. The exception will be handled later: ", + taskId, executionException); + } catch (final InterruptedException shouldNotHappen) { + Thread.currentThread().interrupt(); + log.error(INTERRUPTED_ERROR_MESSAGE, shouldNotHappen); + throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, shouldNotHappen); + } + } + } + + private StateUpdater.RemovedTaskResult waitForFuture(final TaskId taskId, + final CompletableFuture future) + throws ExecutionException, InterruptedException { + + final StateUpdater.RemovedTaskResult removedTaskResult = future.get(); + if (removedTaskResult == null) { + throw new IllegalStateException("Task " + taskId + " was not found in the state updater. " + + BUG_ERROR_MESSAGE); + } + return removedTaskResult; + } + private Map> pendingTasksToCreate(final Map> tasksToCreate) { final Map> pendingTasks = new HashMap<>(); final Iterator>> iter = tasksToCreate.entrySet().iterator(); @@ -1186,12 +1238,23 @@ public class TaskManager { private void removeLostActiveTasksFromStateUpdater() { if (stateUpdater != null) { + final Map> futures = new LinkedHashMap<>(); + final Map failedTasksDuringCleanClose = new HashMap<>(); + final Set tasksToCloseClean = new HashSet<>(); + final Set tasksToCloseDirty = new HashSet<>(); for (final Task restoringTask : stateUpdater.getTasks()) { if (restoringTask.isActive()) { - tasks.addPendingTaskToCloseClean(restoringTask.id()); - stateUpdater.remove(restoringTask.id()); + futures.put(restoringTask.id(), stateUpdater.removeWithFuture(restoringTask.id())); } } + + addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty); + for (final Task task : tasksToCloseClean) { + closeTaskClean(task, tasksToCloseDirty, failedTasksDuringCleanClose); + } + for (final Task task : tasksToCloseDirty) { + closeTaskDirty(task, false); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 566e83826bb..d64f3038f13 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -85,6 +85,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -1409,7 +1410,7 @@ public class TaskManagerTest { } @Test - public void shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() { + public void shouldCloseCleanWhenRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() { final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId00Partitions).build(); @@ -1421,15 +1422,46 @@ public class TaskManagerTest { .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2, task3), tasks); + final CompletableFuture future1 = new CompletableFuture<>(); + when(stateUpdater.removeWithFuture(task1.id())).thenReturn(future1); + future1.complete(new StateUpdater.RemovedTaskResult(task1)); + final CompletableFuture future3 = new CompletableFuture<>(); + when(stateUpdater.removeWithFuture(task3.id())).thenReturn(future3); + future3.complete(new StateUpdater.RemovedTaskResult(task3)); taskManager.handleLostAll(); - verify(stateUpdater).remove(task1.id()); + verify(task1).suspend(); + verify(task1).closeClean(); + verify(task3).suspend(); + verify(task3).closeClean(); verify(stateUpdater, never()).remove(task2.id()); - verify(stateUpdater).remove(task3.id()); - verify(tasks).addPendingTaskToCloseClean(task1.id()); - verify(tasks, never()).addPendingTaskToCloseClean(task2.id()); - verify(tasks).addPendingTaskToCloseClean(task3.id()); + } + + @Test + public void shouldCloseDirtyWhenRemoveFailedActiveTasksFromStateUpdaterOnPartitionLost() { + final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final StreamTask task2 = statefulTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId02Partitions).build(); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2), tasks); + final CompletableFuture future1 = new CompletableFuture<>(); + when(stateUpdater.removeWithFuture(task1.id())).thenReturn(future1); + future1.complete(new StateUpdater.RemovedTaskResult(task1, new StreamsException("Something happened"))); + final CompletableFuture future3 = new CompletableFuture<>(); + when(stateUpdater.removeWithFuture(task2.id())).thenReturn(future3); + future3.complete(new StateUpdater.RemovedTaskResult(task2)); + + taskManager.handleLostAll(); + + verify(task1).prepareCommit(); + verify(task1).suspend(); + verify(task1).closeDirty(); + verify(task2).suspend(); + verify(task2).closeClean(); } private TaskManager setupForRevocationAndLost(final Set tasksInStateUpdater,