KAFKA-19171: Kafka Streams crashes with UnsupportedOperationException (#19507)
CI / build (push) Has been cancelled Details

This PR fixes a regression bug introduced with KAFKA-17203. We need to
pass in mutable collections into `closeTaskClean(...)`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Bruno Cadonna <bruno@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-05-15 21:37:04 -07:00
parent 62c6697ac9
commit 923086dba2
1 changed files with 8 additions and 3 deletions

View File

@ -513,8 +513,14 @@ public class TaskManager {
private void handleTasksPendingInitialization() { private void handleTasksPendingInitialization() {
// All tasks pending initialization are not part of the usual bookkeeping // All tasks pending initialization are not part of the usual bookkeeping
final Set<Task> tasksToCloseDirty = new HashSet<>();
for (final Task task : tasks.drainPendingTasksToInit()) { for (final Task task : tasks.drainPendingTasksToInit()) {
closeTaskClean(task, Collections.emptySet(), Collections.emptyMap()); closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
}
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task, false);
} }
} }
@ -1245,7 +1251,6 @@ public class TaskManager {
private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() { private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() {
if (stateUpdater != null) { if (stateUpdater != null) {
final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>(); final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>();
final Map<TaskId, RuntimeException> failedTasksDuringCleanClose = new HashMap<>();
final Set<Task> tasksToCloseClean = new HashSet<>(tasks.drainPendingActiveTasksToInit()); final Set<Task> tasksToCloseClean = new HashSet<>(tasks.drainPendingActiveTasksToInit());
final Set<Task> tasksToCloseDirty = new HashSet<>(); final Set<Task> tasksToCloseDirty = new HashSet<>();
for (final Task restoringTask : stateUpdater.tasks()) { for (final Task restoringTask : stateUpdater.tasks()) {
@ -1256,7 +1261,7 @@ public class TaskManager {
addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty); addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
for (final Task task : tasksToCloseClean) { for (final Task task : tasksToCloseClean) {
closeTaskClean(task, tasksToCloseDirty, failedTasksDuringCleanClose); closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
} }
for (final Task task : tasksToCloseDirty) { for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task, false); closeTaskDirty(task, false);