KAFKA-10199: Remove queue-based remove from state updater (#15896)

Removes the unused remove operation from the state updater
that asynchronously removed tasks and put them into an
output queue.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Bruno Cadonna 2024-05-15 09:48:33 +02:00 committed by GitHub
parent bf88013a28
commit d2e6c86632
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 63 additions and 550 deletions

View File

@ -852,7 +852,7 @@ public class DefaultStateUpdater implements StateUpdater {
}
@Override
public CompletableFuture<RemovedTaskResult> removeWithFuture(final TaskId taskId) {
public CompletableFuture<RemovedTaskResult> remove(final TaskId taskId) {
final CompletableFuture<RemovedTaskResult> future = new CompletableFuture<>();
tasksAndActionsLock.lock();
try {
@ -864,17 +864,6 @@ public class DefaultStateUpdater implements StateUpdater {
return future;
}
@Override
public void remove(final TaskId taskId) {
tasksAndActionsLock.lock();
try {
tasksAndActions.add(TaskAndAction.createRemoveTask(taskId));
tasksAndActionsCondition.signalAll();
} finally {
tasksAndActionsLock.unlock();
}
}
@Override
public void signalResume() {
tasksAndActionsLock.lock();
@ -914,17 +903,6 @@ public class DefaultStateUpdater implements StateUpdater {
return result;
}
@Override
public Set<Task> drainRemovedTasks() {
final List<Task> result = new ArrayList<>();
removedTasks.drainTo(result);
return new HashSet<>(result);
}
public boolean hasRemovedTasks() {
return !removedTasks.isEmpty();
}
@Override
public List<ExceptionAndTask> drainExceptionsAndFailedTasks() {
final List<ExceptionAndTask> result = new ArrayList<>();

View File

@ -1,92 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
import java.util.Objects;
import java.util.Set;
public class PendingUpdateAction {
enum Action {
UPDATE_INPUT_PARTITIONS,
CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS,
RECYCLE,
SUSPEND,
ADD_BACK,
CLOSE_CLEAN
}
private final Set<TopicPartition> inputPartitions;
private final Action action;
private PendingUpdateAction(final Action action, final Set<TopicPartition> inputPartitions) {
this.action = action;
this.inputPartitions = inputPartitions;
}
private PendingUpdateAction(final Action action) {
this(action, null);
}
public static PendingUpdateAction createUpdateInputPartition(final Set<TopicPartition> inputPartitions) {
Objects.requireNonNull(inputPartitions, "Set of input partitions to update is null!");
return new PendingUpdateAction(Action.UPDATE_INPUT_PARTITIONS, inputPartitions);
}
public static PendingUpdateAction createCloseReviveAndUpdateInputPartition(final Set<TopicPartition> inputPartitions) {
Objects.requireNonNull(inputPartitions, "Set of input partitions to update is null!");
return new PendingUpdateAction(Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS, inputPartitions);
}
public static PendingUpdateAction createRecycleTask(final Set<TopicPartition> inputPartitions) {
Objects.requireNonNull(inputPartitions, "Set of input partitions to update is null!");
return new PendingUpdateAction(Action.RECYCLE, inputPartitions);
}
public static PendingUpdateAction createSuspend() {
return new PendingUpdateAction(Action.SUSPEND);
}
public static PendingUpdateAction createAddBack() {
return new PendingUpdateAction(Action.ADD_BACK);
}
public static PendingUpdateAction createCloseClean() {
return new PendingUpdateAction(Action.CLOSE_CLEAN);
}
public Set<TopicPartition> getInputPartitions() {
if (action != Action.UPDATE_INPUT_PARTITIONS && action != Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS && action != Action.RECYCLE) {
throw new IllegalStateException("Action type " + action + " does not have a set of input partitions!");
}
return inputPartitions;
}
public Action getAction() {
return action;
}
@Override
public String toString() {
return "PendingUpdateAction{" +
"inputPartitions=" + inputPartitions +
", action=" + action +
'}';
}
}

View File

@ -136,19 +136,6 @@ public interface StateUpdater {
*/
void add(final Task task);
/**
* Removes a task (active or standby) from the state updater and adds the removed task to the removed tasks.
*
* This method does not block until the removed task is removed from the state updater.
*
* The task to be removed is not removed from the restored active tasks and the failed tasks.
* Stateless tasks will never be added to the removed tasks since they are immediately added to the
* restored active tasks.
*
* @param taskId ID of the task to remove
*/
void remove(final TaskId taskId);
/**
* Removes a task (active or standby) from the state updater.
*
@ -158,7 +145,7 @@ public interface StateUpdater {
*
* @param taskId ID of the task to remove
*/
CompletableFuture<RemovedTaskResult> removeWithFuture(final TaskId taskId);
CompletableFuture<RemovedTaskResult> remove(final TaskId taskId);
/**
* Wakes up the state updater if it is currently dormant, to check if a paused task should be resumed.
@ -178,27 +165,6 @@ public interface StateUpdater {
*/
Set<StreamTask> drainRestoredActiveTasks(final Duration timeout);
/**
* Drains the removed tasks (active and standbys) from the state updater.
*
* Removed tasks returned by this method are tasks extraordinarily removed from the state updater. These do not
* include restored or failed tasks.
*
* The returned removed tasks are removed from the state updater
*
* @return set of tasks removed from the state updater
*/
Set<Task> drainRemovedTasks();
/**
* Checks if the state updater has any tasks that should be removed and returned to the StreamThread
* using `drainRemovedTasks`.
*
* @return true if a subsequent call to `drainRemovedTasks` would return a non-empty collection.
*/
boolean hasRemovedTasks();
/**
* Drains the failed tasks and the corresponding exceptions.
*
@ -223,9 +189,8 @@ public interface StateUpdater {
* not been removed from the state updater with one of the following methods:
* <ul>
* <li>{@link StateUpdater#drainRestoredActiveTasks(Duration)}</li>
* <li>{@link StateUpdater#drainRemovedTasks()}</li>
* <li>{@link StateUpdater#drainExceptionsAndFailedTasks()}</li>
* <li>{@link StateUpdater#removeWithFuture(org.apache.kafka.streams.processor.TaskId)}</li>
* <li>{@link StateUpdater#remove(org.apache.kafka.streams.processor.TaskId)}</li>
* </ul>
*
* @return set of all tasks managed by the state updater
@ -236,8 +201,8 @@ public interface StateUpdater {
* Gets all tasks that are currently being restored inside the state updater.
*
* Tasks that have just being added into the state updater via {@link StateUpdater#add(Task)}
* or have restored completely or removed will not be returned; similarly tasks that have just being
* removed via {@link StateUpdater#remove(TaskId)} maybe returned still.
* or have restored completely or removed will not be returned; tasks that have just being
* removed via {@link StateUpdater#remove(TaskId)} may still be returned.
*
* @return set of all updating tasks inside the state updater
*/
@ -250,9 +215,8 @@ public interface StateUpdater {
* and the task was not removed from the state updater with one of the following methods:
* <ul>
* <li>{@link StateUpdater#drainRestoredActiveTasks(Duration)}</li>
* <li>{@link StateUpdater#drainRemovedTasks()}</li>
* <li>{@link StateUpdater#drainExceptionsAndFailedTasks()}</li>
* <li>{@link StateUpdater#removeWithFuture(org.apache.kafka.streams.processor.TaskId)}</li>
* <li>{@link StateUpdater#remove(org.apache.kafka.streams.processor.TaskId)}</li>
* </ul>
*
* @return {@code true} if the state updater restores active tasks, {@code false} otherwise
@ -269,7 +233,6 @@ public interface StateUpdater {
* The state updater manages all standby tasks that were added with the {@link StateUpdater#add(Task)} and that have
* not been removed from the state updater with one of the following methods:
* <ul>
* <li>{@link StateUpdater#drainRemovedTasks()}</li>
* <li>{@link StateUpdater#drainExceptionsAndFailedTasks()}</li>
* </ul>
*

View File

@ -601,25 +601,25 @@ public class TaskManager {
if (activeTasksToCreate.containsKey(taskId)) {
if (task.isActive()) {
if (!task.inputPartitions().equals(activeTasksToCreate.get(taskId))) {
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(taskId);
futuresForUpdatingInputPartitions.put(taskId, future);
newInputPartitions.put(taskId, activeTasksToCreate.get(taskId));
}
} else {
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(taskId);
futuresForStandbyTasksToRecycle.put(taskId, future);
activeInputPartitions.put(taskId, activeTasksToCreate.get(taskId));
}
activeTasksToCreate.remove(taskId);
} else if (standbyTasksToCreate.containsKey(taskId)) {
if (task.isActive()) {
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(taskId);
futuresForActiveTasksToRecycle.put(taskId, future);
standbyInputPartitions.put(taskId, standbyTasksToCreate.get(taskId));
}
standbyTasksToCreate.remove(taskId);
} else {
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(taskId);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(taskId);
futuresForTasksToClose.put(taskId, future);
}
}
@ -1170,7 +1170,7 @@ public class TaskManager {
for (final Task restoringTask : stateUpdater.getTasks()) {
if (restoringTask.isActive()) {
if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
futures.put(restoringTask.id(), stateUpdater.removeWithFuture(restoringTask.id()));
futures.put(restoringTask.id(), stateUpdater.remove(restoringTask.id()));
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
}
}
@ -1241,7 +1241,7 @@ public class TaskManager {
final Set<Task> tasksToCloseDirty = new HashSet<>();
for (final Task restoringTask : stateUpdater.getTasks()) {
if (restoringTask.isActive()) {
futures.put(restoringTask.id(), stateUpdater.removeWithFuture(restoringTask.id()));
futures.put(restoringTask.id(), stateUpdater.remove(restoringTask.id()));
}
}
@ -1484,7 +1484,7 @@ public class TaskManager {
if (stateUpdater != null) {
final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new LinkedHashMap<>();
for (final Task task : stateUpdater.getTasks()) {
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id());
futures.put(task.id(), future);
}
final Set<Task> tasksToCloseClean = new HashSet<>();

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.PendingUpdateAction.Action;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.slf4j.Logger;
@ -57,7 +56,6 @@ class Tasks implements TasksRegistry {
private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate = new HashMap<>();
private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate = new HashMap<>();
private final Set<Task> pendingTasksToInit = new HashSet<>();
private final Map<TaskId, PendingUpdateAction> pendingUpdateActions = new HashMap<>();
// TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks
private final Map<TopicPartition, Task> activeTasksPerPartition = new HashMap<>();
@ -102,103 +100,6 @@ class Tasks implements TasksRegistry {
pendingStandbyTasksToCreate.putAll(pendingTasks);
}
@Override
public Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId) {
return removePendingUpdateActionWithInputPartitions(taskId, Action.RECYCLE);
}
@Override
public void addPendingTaskToRecycle(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
updatePendingUpdateAction(taskId, PendingUpdateAction.createRecycleTask(inputPartitions));
}
@Override
public boolean hasPendingTasksToRecycle() {
return pendingUpdateActions.values().stream().anyMatch(action -> action.getAction() == Action.RECYCLE);
}
@Override
public Set<TopicPartition> removePendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId) {
return removePendingUpdateActionWithInputPartitions(taskId, Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS);
}
@Override
public void addPendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
updatePendingUpdateAction(taskId, PendingUpdateAction.createCloseReviveAndUpdateInputPartition(inputPartitions));
}
@Override
public Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId taskId) {
return removePendingUpdateActionWithInputPartitions(taskId, Action.UPDATE_INPUT_PARTITIONS);
}
@Override
public void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
updatePendingUpdateAction(taskId, PendingUpdateAction.createUpdateInputPartition(inputPartitions));
}
@Override
public boolean removePendingTaskToAddBack(final TaskId taskId) {
return removePendingUpdateAction(taskId, Action.ADD_BACK);
}
@Override
public void addPendingTaskToAddBack(final TaskId taskId) {
updatePendingUpdateAction(taskId, PendingUpdateAction.createAddBack());
}
@Override
public boolean removePendingTaskToCloseClean(final TaskId taskId) {
return removePendingUpdateAction(taskId, Action.CLOSE_CLEAN);
}
@Override
public void addPendingTaskToCloseClean(final TaskId taskId) {
updatePendingUpdateAction(taskId, PendingUpdateAction.createCloseClean());
}
@Override
public boolean removePendingActiveTaskToSuspend(final TaskId taskId) {
return removePendingUpdateAction(taskId, Action.SUSPEND);
}
@Override
public void addPendingActiveTaskToSuspend(final TaskId taskId) {
updatePendingUpdateAction(taskId, PendingUpdateAction.createSuspend());
}
private Set<TopicPartition> removePendingUpdateActionWithInputPartitions(final TaskId taskId, final Action action) {
if (containsTaskIdWithAction(taskId, action)) {
final PendingUpdateAction pendingUpdateAction = pendingUpdateActions.remove(taskId);
log.info("Removing pending update action {} for task {}", taskId, pendingUpdateAction);
return pendingUpdateAction.getInputPartitions();
}
return null;
}
private boolean removePendingUpdateAction(final TaskId taskId, final Action action) {
if (containsTaskIdWithAction(taskId, action)) {
log.info("Removing pending update action {} for task {}", taskId, pendingUpdateActions.remove(taskId));
return true;
}
return false;
}
private void updatePendingUpdateAction(final TaskId taskId, final PendingUpdateAction newAction) {
if (pendingUpdateActions.containsKey(taskId)) {
log.info("Adding pending update action {} for task {}, previous action was {}",
newAction, taskId, pendingUpdateActions.get(taskId));
} else {
log.info("Adding pending update action {} for task {}, no previous action", newAction, taskId);
}
pendingUpdateActions.put(taskId, newAction);
}
private boolean containsTaskIdWithAction(final TaskId taskId, final Action action) {
final PendingUpdateAction pendingUpdateAction = pendingUpdateActions.get(taskId);
return pendingUpdateAction != null && pendingUpdateAction.getAction() == action;
}
@Override
public Set<Task> drainPendingTasksToInit() {
final Set<Task> result = new HashSet<>(pendingTasksToInit);

View File

@ -35,28 +35,6 @@ public interface TasksRegistry {
void clearPendingTasksToCreate();
Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId);
boolean hasPendingTasksToRecycle();
void addPendingTaskToRecycle(final TaskId taskId, final Set<TopicPartition> inputPartitions);
Set<TopicPartition> removePendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId);
void addPendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions);
Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId taskId);
void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions);
boolean removePendingTaskToAddBack(final TaskId taskId);
void addPendingTaskToAddBack(final TaskId taskId);
boolean removePendingTaskToCloseClean(final TaskId taskId);
void addPendingTaskToCloseClean(final TaskId taskId);
Set<Task> drainPendingTasksToInit();
Set<Task> pendingTasksToInit();
@ -65,10 +43,6 @@ public interface TasksRegistry {
boolean hasPendingTasksToInit();
boolean removePendingActiveTaskToSuspend(final TaskId taskId);
void addPendingActiveTaskToSuspend(final TaskId taskId);
void addActiveTasks(final Collection<Task> tasks);
void addStandbyTasks(final Collection<Task> tasks);

View File

@ -440,7 +440,7 @@ class DefaultStateUpdaterTest {
.thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.removeWithFuture(task.id()).get();
stateUpdater.remove(task.id()).get();
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
@ -707,8 +707,8 @@ class DefaultStateUpdaterTest {
stateUpdater.add(standbyTask);
verifyUpdatingTasks(activeTask1, activeTask2, standbyTask);
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = stateUpdater.removeWithFuture(activeTask1.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future2 = stateUpdater.removeWithFuture(activeTask2.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = stateUpdater.remove(activeTask1.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future2 = stateUpdater.remove(activeTask2.id());
CompletableFuture.allOf(future1, future2).get();
final InOrder orderVerifier = inOrder(changelogReader);
@ -727,7 +727,7 @@ class DefaultStateUpdaterTest {
stateUpdater.add(standbyTask2);
verifyUpdatingTasks(standbyTask1, standbyTask2);
stateUpdater.removeWithFuture(standbyTask2.id()).get();
stateUpdater.remove(standbyTask2.id()).get();
verify(changelogReader).transitToUpdateStandby();
}
@ -751,7 +751,7 @@ class DefaultStateUpdaterTest {
stateUpdater.add(task);
verifyUpdatingTasks(task);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id());
assertEquals(new StateUpdater.RemovedTaskResult(task), future.get());
verifyCheckpointTasks(true, task);
@ -768,7 +768,7 @@ class DefaultStateUpdaterTest {
final StreamsException streamsException = new StreamsException("Something happened", task.id());
setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(task, streamsException);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id());
verifyRemovingUpdatingStatefulTaskFails(future, task, streamsException, true);
@ -780,7 +780,7 @@ class DefaultStateUpdaterTest {
final RuntimeException runtimeException = new RuntimeException("Something happened");
setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(task, runtimeException);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id());
verifyRemovingUpdatingStatefulTaskFails(future, task, runtimeException, false);
}
@ -791,7 +791,7 @@ class DefaultStateUpdaterTest {
final StreamsException streamsException = new StreamsException("Something happened", task.id());
setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(task, streamsException);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id());
verifyRemovingUpdatingStatefulTaskFails(future, task, streamsException, true);
}
@ -802,7 +802,7 @@ class DefaultStateUpdaterTest {
final RuntimeException runtimeException = new RuntimeException("Something happened");
setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(task, runtimeException);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id());
verifyRemovingUpdatingStatefulTaskFails(future, task, runtimeException, false);
}
@ -843,8 +843,8 @@ class DefaultStateUpdaterTest {
verifyPausedTasks(statefulTask, standbyTask);
verifyUpdatingTasks();
final CompletableFuture<StateUpdater.RemovedTaskResult> futureOfStatefulTask = stateUpdater.removeWithFuture(statefulTask.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> futureOfStandbyTask = stateUpdater.removeWithFuture(standbyTask.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> futureOfStatefulTask = stateUpdater.remove(statefulTask.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> futureOfStandbyTask = stateUpdater.remove(standbyTask.id());
assertEquals(new StateUpdater.RemovedTaskResult(statefulTask), futureOfStatefulTask.get());
assertEquals(new StateUpdater.RemovedTaskResult(standbyTask), futureOfStandbyTask.get());
@ -869,7 +869,7 @@ class DefaultStateUpdaterTest {
verifyPausedTasks(statefulTask);
verifyUpdatingTasks();
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(statefulTask.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(statefulTask.id());
final ExecutionException executionException = assertThrows(ExecutionException.class, future::get);
assertInstanceOf(StreamsException.class, executionException.getCause());
@ -899,7 +899,7 @@ class DefaultStateUpdaterTest {
stateUpdater.add(task);
verifyRestoredActiveTasks(task);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id());
future.get();
assertEquals(new StateUpdater.RemovedTaskResult(task), future.get());
@ -935,7 +935,7 @@ class DefaultStateUpdaterTest {
final ExceptionAndTask expectedExceptionAndTasks = new ExceptionAndTask(streamsException, task);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(task.id());
assertEquals(new StateUpdater.RemovedTaskResult(task, streamsException), future.get());
verifyPausedTasks();
@ -965,7 +965,7 @@ class DefaultStateUpdaterTest {
verifyUpdatingTasks(updatingTask);
verifyPausedTasks();
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(TASK_1_0);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(TASK_1_0);
assertNull(future.get());
verifyRestoredActiveTasks(restoredTask);
@ -978,7 +978,7 @@ class DefaultStateUpdaterTest {
public void shouldCompleteWithNullIfNoTasks() throws Exception {
stateUpdater.start();
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(TASK_0_1);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.remove(TASK_0_1);
assertNull(future.get());
assertTrue(stateUpdater.isRunning());
@ -1475,7 +1475,7 @@ class DefaultStateUpdaterTest {
stateUpdater.add(activeTask1);
stateUpdater.add(standbyTask1);
stateUpdater.add(standbyTask2);
stateUpdater.removeWithFuture(TASK_0_0);
stateUpdater.remove(TASK_0_0);
stateUpdater.add(activeTask2);
stateUpdater.add(standbyTask3);

View File

@ -441,7 +441,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(activeTaskToClose.id())).thenReturn(future);
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
future.complete(new StateUpdater.RemovedTaskResult(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
@ -462,7 +462,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(activeTaskToClose.id())).thenReturn(future);
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
future.complete(new StateUpdater.RemovedTaskResult(activeTaskToClose, new RuntimeException("KABOOM!")));
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
@ -484,7 +484,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(standbyTaskToClose.id())).thenReturn(future);
when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
future.complete(new StateUpdater.RemovedTaskResult(standbyTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
@ -504,7 +504,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(standbyTaskToClose.id())).thenReturn(future);
when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
future.complete(new StateUpdater.RemovedTaskResult(standbyTaskToClose, new RuntimeException("KABOOM!")));
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
@ -525,7 +525,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(failedStandbyTask));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(failedStandbyTask.id())).thenReturn(future);
when(stateUpdater.remove(failedStandbyTask.id())).thenReturn(future);
final RuntimeException kaboom = new RuntimeException("KABOOM!");
future.completeExceptionally(kaboom);
when(stateUpdater.drainExceptionsAndFailedTasks())
@ -552,7 +552,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(activeTaskToUpdateInputPartitions.id())).thenReturn(future);
when(stateUpdater.remove(activeTaskToUpdateInputPartitions.id())).thenReturn(future);
future.complete(new StateUpdater.RemovedTaskResult(activeTaskToUpdateInputPartitions));
taskManager.handleAssignment(
@ -582,7 +582,7 @@ public class TaskManagerTest {
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId03Partitions))
.thenReturn(recycledStandbyTask);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(taskId03)).thenReturn(future);
when(stateUpdater.remove(taskId03)).thenReturn(future);
future.complete(new StateUpdater.RemovedTaskResult(activeTaskToRecycle));
taskManager.handleAssignment(
@ -606,7 +606,7 @@ public class TaskManagerTest {
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, activeTaskToRecycle.inputPartitions()))
.thenThrow(new RuntimeException());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(activeTaskToRecycle.id())).thenReturn(future);
when(stateUpdater.remove(activeTaskToRecycle.id())).thenReturn(future);
future.complete(new StateUpdater.RemovedTaskResult(activeTaskToRecycle));
assertThrows(
@ -636,7 +636,7 @@ public class TaskManagerTest {
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, taskId03Partitions, consumer))
.thenReturn(recycledActiveTask);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(standbyTaskToRecycle.id())).thenReturn(future);
when(stateUpdater.remove(standbyTaskToRecycle.id())).thenReturn(future);
future.complete(new StateUpdater.RemovedTaskResult(standbyTaskToRecycle));
taskManager.handleAssignment(
@ -663,7 +663,7 @@ public class TaskManagerTest {
consumer))
.thenThrow(new RuntimeException());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(standbyTaskToRecycle.id())).thenReturn(future);
when(stateUpdater.remove(standbyTaskToRecycle.id())).thenReturn(future);
future.complete(new StateUpdater.RemovedTaskResult(standbyTaskToRecycle));
assertThrows(
@ -693,7 +693,7 @@ public class TaskManagerTest {
Collections.emptyMap()
);
verify(stateUpdater, never()).removeWithFuture(reassignedActiveTask.id());
verify(stateUpdater, never()).remove(reassignedActiveTask.id());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@ -731,8 +731,7 @@ public class TaskManagerTest {
Collections.emptyMap(),
mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), taskId03Partitions))
);
verify(stateUpdater, never()).removeWithFuture(standbyTaskToUpdateInputPartitions.id());
verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@ -770,12 +769,12 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose, standbyTaskToRecycle));
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForActiveTaskToClose = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(activeTaskToClose.id())).thenReturn(futureForActiveTaskToClose);
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(futureForActiveTaskToClose);
futureForActiveTaskToClose.complete(new StateUpdater.RemovedTaskResult(activeTaskToClose));
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, taskId02Partitions, consumer))
.thenReturn(recycledActiveTask);
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForStandbyTaskToRecycle = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(standbyTaskToRecycle.id())).thenReturn(futureForStandbyTaskToRecycle);
when(stateUpdater.remove(standbyTaskToRecycle.id())).thenReturn(futureForStandbyTaskToRecycle);
futureForStandbyTaskToRecycle.complete(new StateUpdater.RemovedTaskResult(standbyTaskToRecycle));
taskManager.handleAssignment(
@ -1147,7 +1146,6 @@ public class TaskManagerTest {
StreamsException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
verify(stateUpdater, never()).add(task00);
verify(tasks).addTask(task00);
assertTrue(streamsException.taskId().isPresent());
@ -1220,14 +1218,14 @@ public class TaskManagerTest {
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task), tasks);
when(stateUpdater.getTasks()).thenReturn(mkSet(task));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task.id())).thenReturn(future);
when(stateUpdater.remove(task.id())).thenReturn(future);
future.complete(new StateUpdater.RemovedTaskResult(task));
taskManager.handleRevocation(task.inputPartitions());
verify(task).suspend();
verify(tasks).addTask(task);
verify(stateUpdater).removeWithFuture(task.id());
verify(stateUpdater).remove(task.id());
}
@Test
@ -1241,10 +1239,10 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2), tasks);
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task1.id())).thenReturn(future1);
when(stateUpdater.remove(task1.id())).thenReturn(future1);
future1.complete(new StateUpdater.RemovedTaskResult(task1));
final CompletableFuture<StateUpdater.RemovedTaskResult> future2 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task2.id())).thenReturn(future2);
when(stateUpdater.remove(task2.id())).thenReturn(future2);
future2.complete(new StateUpdater.RemovedTaskResult(task2));
taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, taskId01Partitions));
@ -1267,7 +1265,7 @@ public class TaskManagerTest {
verify(task, never()).suspend();
verify(tasks, never()).addTask(task);
verify(stateUpdater, never()).removeWithFuture(task.id());
verify(stateUpdater, never()).remove(task.id());
}
@Test
@ -1282,7 +1280,7 @@ public class TaskManagerTest {
verify(task, never()).suspend();
verify(tasks, never()).addTask(task);
verify(stateUpdater, never()).removeWithFuture(task.id());
verify(stateUpdater, never()).remove(task.id());
}
@Test
@ -1296,10 +1294,10 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2), tasks);
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task1.id())).thenReturn(future1);
when(stateUpdater.remove(task1.id())).thenReturn(future1);
future1.complete(new StateUpdater.RemovedTaskResult(task1));
final CompletableFuture<StateUpdater.RemovedTaskResult> future2 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task2.id())).thenReturn(future2);
when(stateUpdater.remove(task2.id())).thenReturn(future2);
final StreamsException streamsException = new StreamsException("Something happened");
future2.complete(new StateUpdater.RemovedTaskResult(task2, streamsException));
@ -1329,10 +1327,10 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2, task3), tasks);
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task1.id())).thenReturn(future1);
when(stateUpdater.remove(task1.id())).thenReturn(future1);
future1.complete(new StateUpdater.RemovedTaskResult(task1));
final CompletableFuture<StateUpdater.RemovedTaskResult> future3 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task3.id())).thenReturn(future3);
when(stateUpdater.remove(task3.id())).thenReturn(future3);
future3.complete(new StateUpdater.RemovedTaskResult(task3));
taskManager.handleLostAll();
@ -1341,7 +1339,7 @@ public class TaskManagerTest {
verify(task1).closeClean();
verify(task3).suspend();
verify(task3).closeClean();
verify(stateUpdater, never()).removeWithFuture(task2.id());
verify(stateUpdater, never()).remove(task2.id());
}
@Test
@ -1355,10 +1353,10 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setupForRevocationAndLost(mkSet(task1, task2), tasks);
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task1.id())).thenReturn(future1);
when(stateUpdater.remove(task1.id())).thenReturn(future1);
future1.complete(new StateUpdater.RemovedTaskResult(task1, new StreamsException("Something happened")));
final CompletableFuture<StateUpdater.RemovedTaskResult> future3 = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(task2.id())).thenReturn(future3);
when(stateUpdater.remove(task2.id())).thenReturn(future3);
future3.complete(new StateUpdater.RemovedTaskResult(task2));
taskManager.handleLostAll();
@ -1446,10 +1444,8 @@ public class TaskManagerTest {
@Test
public void shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}
@ -1502,8 +1498,6 @@ public class TaskManagerTest {
assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", thrown.getMessage());
}
@Test
public void shouldAddSubscribedTopicsFromAssignmentToTopologyMetadata() {
final Map<TaskId, Set<TopicPartition>> activeTasksAssignment = mkMap(
mkEntry(taskId01, mkSet(t1p1)),
@ -3412,13 +3406,13 @@ public class TaskManagerTest {
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedFailedStandbyTask = new CompletableFuture<>();
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedFailedStatefulTaskDuringRemoval = new CompletableFuture<>();
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemovedFailedStandbyTaskDuringRemoval = new CompletableFuture<>();
when(stateUpdater.removeWithFuture(removedStatefulTask.id())).thenReturn(futureForRemovedStatefulTask);
when(stateUpdater.removeWithFuture(removedStandbyTask.id())).thenReturn(futureForRemovedStandbyTask);
when(stateUpdater.removeWithFuture(removedFailedStatefulTask.id())).thenReturn(futureForRemovedFailedStatefulTask);
when(stateUpdater.removeWithFuture(removedFailedStandbyTask.id())).thenReturn(futureForRemovedFailedStandbyTask);
when(stateUpdater.removeWithFuture(removedFailedStatefulTaskDuringRemoval.id()))
when(stateUpdater.remove(removedStatefulTask.id())).thenReturn(futureForRemovedStatefulTask);
when(stateUpdater.remove(removedStandbyTask.id())).thenReturn(futureForRemovedStandbyTask);
when(stateUpdater.remove(removedFailedStatefulTask.id())).thenReturn(futureForRemovedFailedStatefulTask);
when(stateUpdater.remove(removedFailedStandbyTask.id())).thenReturn(futureForRemovedFailedStandbyTask);
when(stateUpdater.remove(removedFailedStatefulTaskDuringRemoval.id()))
.thenReturn(futureForRemovedFailedStatefulTaskDuringRemoval);
when(stateUpdater.removeWithFuture(removedFailedStandbyTaskDuringRemoval.id()))
when(stateUpdater.remove(removedFailedStandbyTaskDuringRemoval.id()))
.thenReturn(futureForRemovedFailedStandbyTaskDuringRemoval);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(
new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStatefulTaskDuringRemoval),

View File

@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@ -35,8 +34,6 @@ import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -48,10 +45,7 @@ public class TasksTest {
private final static TopicPartition TOPIC_PARTITION_B_1 = new TopicPartition("topicB", 1);
private final static TaskId TASK_0_0 = new TaskId(0, 0);
private final static TaskId TASK_0_1 = new TaskId(0, 1);
private final static TaskId TASK_0_2 = new TaskId(0, 2);
private final static TaskId TASK_1_0 = new TaskId(1, 0);
private final static TaskId TASK_1_1 = new TaskId(1, 1);
private final static TaskId TASK_1_2 = new TaskId(1, 2);
private final Tasks tasks = new Tasks(new LogContext());
@ -130,41 +124,6 @@ public class TasksTest {
assertEquals(Collections.emptyMap(), tasks.drainPendingStandbyTasksForTopologies(mkSet("B")));
}
@Test
public void shouldAddAndRemovePendingTaskToRecycle() {
final Set<TopicPartition> expectedInputPartitions = mkSet(TOPIC_PARTITION_A_0);
assertNull(tasks.removePendingTaskToRecycle(TASK_0_0));
tasks.addPendingTaskToRecycle(TASK_0_0, expectedInputPartitions);
final Set<TopicPartition> actualInputPartitions = tasks.removePendingTaskToRecycle(TASK_0_0);
assertEquals(expectedInputPartitions, actualInputPartitions);
assertNull(tasks.removePendingTaskToRecycle(TASK_0_0));
}
@Test
public void shouldVerifyIfPendingTaskToRecycleExist() {
assertFalse(tasks.hasPendingTasksToRecycle());
tasks.addPendingTaskToRecycle(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
assertTrue(tasks.hasPendingTasksToRecycle());
tasks.addPendingTaskToRecycle(TASK_1_0, mkSet(TOPIC_PARTITION_A_1));
assertTrue(tasks.hasPendingTasksToRecycle());
tasks.addPendingTaskToCloseClean(TASK_0_1);
tasks.addPendingTaskToAddBack(TASK_0_2);
tasks.addPendingTaskToUpdateInputPartitions(TASK_1_1, mkSet(TOPIC_PARTITION_B_0));
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_1_1, mkSet(TOPIC_PARTITION_B_0));
tasks.addPendingActiveTaskToSuspend(TASK_1_2);
assertTrue(tasks.hasPendingTasksToRecycle());
tasks.removePendingTaskToRecycle(TASK_0_0);
assertTrue(tasks.hasPendingTasksToRecycle());
tasks.removePendingTaskToRecycle(TASK_1_0);
assertFalse(tasks.hasPendingTasksToRecycle());
}
@Test
public void shouldVerifyIfPendingTaskToInitExist() {
assertFalse(tasks.hasPendingTasksToInit());
@ -177,173 +136,9 @@ public class TasksTest {
tasks.addPendingTasksToInit(Collections.singleton(standbyTask));
assertTrue(tasks.hasPendingTasksToInit());
tasks.addPendingTaskToCloseClean(TASK_0_1);
tasks.addPendingTaskToAddBack(TASK_0_2);
tasks.addPendingTaskToUpdateInputPartitions(TASK_1_1, mkSet(TOPIC_PARTITION_B_0));
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_1_1, mkSet(TOPIC_PARTITION_B_0));
tasks.addPendingActiveTaskToSuspend(TASK_1_2);
assertTrue(tasks.hasPendingTasksToInit());
tasks.drainPendingTasksToInit();
assertFalse(tasks.hasPendingTasksToInit());
}
@Test
public void shouldAddAndRemovePendingTaskToUpdateInputPartitions() {
final Set<TopicPartition> expectedInputPartitions = mkSet(TOPIC_PARTITION_A_0);
assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0));
tasks.addPendingTaskToUpdateInputPartitions(TASK_0_0, expectedInputPartitions);
final Set<TopicPartition> actualInputPartitions = tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0);
assertEquals(expectedInputPartitions, actualInputPartitions);
assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0));
}
@Test
public void shouldAddAndRemovePendingTaskToCloseReviveAndUpdateInputPartitions() {
final Set<TopicPartition> expectedInputPartitions = mkSet(TOPIC_PARTITION_A_0);
assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0));
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0, expectedInputPartitions);
final Set<TopicPartition> actualInputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0);
assertEquals(expectedInputPartitions, actualInputPartitions);
assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0));
}
@Test
public void shouldAddAndRemovePendingTaskToCloseClean() {
assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0));
tasks.addPendingTaskToCloseClean(TASK_0_0);
assertTrue(tasks.removePendingTaskToCloseClean(TASK_0_0));
assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0));
}
@Test
public void shouldAddAndRemovePendingTaskToAddBack() {
assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0));
tasks.addPendingTaskToAddBack(TASK_0_0);
assertTrue(tasks.removePendingTaskToAddBack(TASK_0_0));
assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0));
}
@Test
public void shouldAddAndRemovePendingTaskToSuspend() {
assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0));
tasks.addPendingActiveTaskToSuspend(TASK_0_0);
assertTrue(tasks.removePendingActiveTaskToSuspend(TASK_0_0));
assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0));
}
@Test
public void onlyRemovePendingTaskToRecycleShouldRemoveTaskFromPendingUpdateActions() {
tasks.addPendingTaskToRecycle(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0));
assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0));
assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0));
assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0));
assertNotNull(tasks.removePendingTaskToRecycle(TASK_0_0));
assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0));
}
@Test
public void onlyRemovePendingTaskToCloseReviveAndUpdateInputPartitionsShouldRemoveTaskFromPendingUpdateActions() {
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0));
assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0));
assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0));
assertNull(tasks.removePendingTaskToRecycle(TASK_0_0));
assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0));
assertNotNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0));
}
@Test
public void onlyRemovePendingTaskToUpdateInputPartitionsShouldRemoveTaskFromPendingUpdateActions() {
tasks.addPendingTaskToUpdateInputPartitions(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0));
assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0));
assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0));
assertNull(tasks.removePendingTaskToRecycle(TASK_0_0));
assertNotNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0));
assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0));
}
@Test
public void onlyRemovePendingTaskToCloseCleanShouldRemoveTaskFromPendingUpdateActions() {
tasks.addPendingTaskToCloseClean(TASK_0_0);
assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0));
assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0));
assertNull(tasks.removePendingTaskToRecycle(TASK_0_0));
assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0));
assertTrue(tasks.removePendingTaskToCloseClean(TASK_0_0));
assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0));
}
@Test
public void onlyRemovePendingTaskToAddBackShouldRemoveTaskFromPendingUpdateActions() {
tasks.addPendingTaskToAddBack(TASK_0_0);
assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0));
assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0));
assertNull(tasks.removePendingTaskToRecycle(TASK_0_0));
assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0));
assertTrue(tasks.removePendingTaskToAddBack(TASK_0_0));
assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0));
}
@Test
public void onlyRemovePendingTaskToSuspendShouldRemoveTaskFromPendingUpdateActions() {
tasks.addPendingActiveTaskToSuspend(TASK_0_0);
assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0));
assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0));
assertNull(tasks.removePendingTaskToRecycle(TASK_0_0));
assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0));
assertTrue(tasks.removePendingActiveTaskToSuspend(TASK_0_0));
assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0));
}
@Test
public void shouldOnlyKeepLastUpdateAction() {
tasks.addPendingTaskToRecycle(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
tasks.addPendingTaskToUpdateInputPartitions(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
assertNull(tasks.removePendingTaskToRecycle(TASK_0_0));
assertNotNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0));
tasks.addPendingTaskToUpdateInputPartitions(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
tasks.addPendingTaskToCloseClean(TASK_0_0);
assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0));
assertTrue(tasks.removePendingTaskToCloseClean(TASK_0_0));
tasks.addPendingTaskToCloseClean(TASK_0_0);
tasks.addPendingTaskToAddBack(TASK_0_0);
assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0));
assertTrue(tasks.removePendingTaskToAddBack(TASK_0_0));
tasks.addPendingTaskToAddBack(TASK_0_0);
tasks.addPendingActiveTaskToSuspend(TASK_0_0);
assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0));
assertTrue(tasks.removePendingActiveTaskToSuspend(TASK_0_0));
tasks.addPendingActiveTaskToSuspend(TASK_0_0);
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0));
assertNotNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0));
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
tasks.addPendingTaskToRecycle(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0));
assertNotNull(tasks.removePendingTaskToRecycle(TASK_0_0));
}
}