KAFKA-10199: Remove lost tasks in state updater with new remove (#15870)

Uses the new remove operation of the state updater that returns a future to remove lost tasks from the state udpater.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Bruno Cadonna 2024-05-07 14:26:23 +02:00 committed by GitHub
parent ea485a7061
commit cb35ddc5ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 103 additions and 8 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
@ -71,6 +73,11 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMo
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
public class TaskManager {
private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
"Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. " + BUG_ERROR_MESSAGE;
// initialize the task list
// activeTasks needs to be concurrent as it can be accessed
// by QueryableState
@ -602,6 +609,51 @@ public class TaskManager {
tasks.addPendingTaskToCloseClean(taskId);
}
private void addToTasksToClose(final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures,
final Set<Task> tasksToCloseCleanFromStateUpdater,
final Set<Task> tasksToCloseDirtyFromStateUpdater) {
iterateAndActOnFuture(futures, removedTaskResult -> {
final Task task = removedTaskResult.task();
final Optional<RuntimeException> exception = removedTaskResult.exception();
if (exception.isPresent()) {
tasksToCloseDirtyFromStateUpdater.add(task);
} else {
tasksToCloseCleanFromStateUpdater.add(task);
}
});
}
private void iterateAndActOnFuture(final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures,
final java.util.function.Consumer<StateUpdater.RemovedTaskResult> action) {
for (final Map.Entry<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> entry : futures.entrySet()) {
final TaskId taskId = entry.getKey();
final CompletableFuture<StateUpdater.RemovedTaskResult> future = entry.getValue();
try {
final StateUpdater.RemovedTaskResult removedTaskResult = waitForFuture(taskId, future);
action.accept(removedTaskResult);
} catch (final ExecutionException executionException) {
log.warn("An exception happened when removing task {} from the state updater. The exception will be handled later: ",
taskId, executionException);
} catch (final InterruptedException shouldNotHappen) {
Thread.currentThread().interrupt();
log.error(INTERRUPTED_ERROR_MESSAGE, shouldNotHappen);
throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, shouldNotHappen);
}
}
}
private StateUpdater.RemovedTaskResult waitForFuture(final TaskId taskId,
final CompletableFuture<StateUpdater.RemovedTaskResult> future)
throws ExecutionException, InterruptedException {
final StateUpdater.RemovedTaskResult removedTaskResult = future.get();
if (removedTaskResult == null) {
throw new IllegalStateException("Task " + taskId + " was not found in the state updater. "
+ BUG_ERROR_MESSAGE);
}
return removedTaskResult;
}
private Map<TaskId, Set<TopicPartition>> pendingTasksToCreate(final Map<TaskId, Set<TopicPartition>> tasksToCreate) {
final Map<TaskId, Set<TopicPartition>> pendingTasks = new HashMap<>();
final Iterator<Map.Entry<TaskId, Set<TopicPartition>>> iter = tasksToCreate.entrySet().iterator();
@ -1186,12 +1238,23 @@ public class TaskManager {
private void removeLostActiveTasksFromStateUpdater() {
if (stateUpdater != null) {
final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>();
final Map<TaskId, RuntimeException> failedTasksDuringCleanClose = new HashMap<>();
final Set<Task> tasksToCloseClean = new HashSet<>();
final Set<Task> tasksToCloseDirty = new HashSet<>();
for (final Task restoringTask : stateUpdater.getTasks()) {
if (restoringTask.isActive()) {
tasks.addPendingTaskToCloseClean(restoringTask.id());
stateUpdater.remove(restoringTask.id());
futures.put(restoringTask.id(), stateUpdater.removeWithFuture(restoringTask.id()));
}
}
addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
for (final Task task : tasksToCloseClean) {
closeTaskClean(task, tasksToCloseDirty, failedTasksDuringCleanClose);
}
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task, false);
}
}
}

View File

@ -85,6 +85,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -1409,7 +1410,7 @@ public class TaskManagerTest {
}
@Test
public void shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() {
public void shouldCloseCleanWhenRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() {
final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
@ -1421,15 +1422,46 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2, task3), tasks);
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task1.id())).thenReturn(future1);
future1.complete(new StateUpdater.RemovedTaskResult(task1));
final CompletableFuture<StateUpdater.RemovedTaskResult> future3 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task3.id())).thenReturn(future3);
future3.complete(new StateUpdater.RemovedTaskResult(task3));
taskManager.handleLostAll();
verify(stateUpdater).remove(task1.id());
verify(task1).suspend();
verify(task1).closeClean();
verify(task3).suspend();
verify(task3).closeClean();
verify(stateUpdater, never()).remove(task2.id());
verify(stateUpdater).remove(task3.id());
verify(tasks).addPendingTaskToCloseClean(task1.id());
verify(tasks, never()).addPendingTaskToCloseClean(task2.id());
verify(tasks).addPendingTaskToCloseClean(task3.id());
}
@Test
public void shouldCloseDirtyWhenRemoveFailedActiveTasksFromStateUpdaterOnPartitionLost() {
final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask task2 = statefulTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2), tasks);
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task1.id())).thenReturn(future1);
future1.complete(new StateUpdater.RemovedTaskResult(task1, new StreamsException("Something happened")));
final CompletableFuture<StateUpdater.RemovedTaskResult> future3 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task2.id())).thenReturn(future3);
future3.complete(new StateUpdater.RemovedTaskResult(task2));
taskManager.handleLostAll();
verify(task1).prepareCommit();
verify(task1).suspend();
verify(task1).closeDirty();
verify(task2).suspend();
verify(task2).closeClean();
}
private TaskManager setupForRevocationAndLost(final Set<Task> tasksInStateUpdater,