KAFKA-10199: Add remove operation with future to state updater (#15852)

Adds a remove operation to the state updater that returns a future
instead of adding the removed tasks to an output queue. Code that
uses the state updater can then wait on the future.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Bruno Cadonna 2024-05-06 11:27:40 +02:00 committed by GitHub
parent 55a00be4e9
commit 366aeab488
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 472 additions and 269 deletions

View File

@ -44,6 +44,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@ -51,6 +52,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@ -191,14 +193,20 @@ public class DefaultStateUpdater implements StateUpdater {
tasksAndActionsLock.lock();
try {
for (final TaskAndAction taskAndAction : getTasksAndActions()) {
final Action action = taskAndAction.getAction();
final Action action = taskAndAction.action();
switch (action) {
case ADD:
addTask(taskAndAction.getTask());
addTask(taskAndAction.task());
break;
case REMOVE:
removeTask(taskAndAction.getTaskId());
if (taskAndAction.futureForRemove() == null) {
removeTask(taskAndAction.taskId());
} else {
removeTask(taskAndAction.taskId(), taskAndAction.futureForRemove());
}
break;
default:
throw new IllegalStateException("Unknown action type " + action);
}
}
} finally {
@ -496,6 +504,109 @@ public class DefaultStateUpdater implements StateUpdater {
}
}
private void removeTask(final TaskId taskId, final CompletableFuture<RemovedTaskResult> future) {
try {
if (!removeUpdatingTask(taskId, future)) {
if (!removePausedTask(taskId, future)) {
if (!removeRestoredTask(taskId, future)) {
if (!removeFailedTask(taskId, future)) {
future.complete(null);
log.warn("Task " + taskId + " could not be removed from the state updater because "
+ "the state updater does not own this task.");
}
}
}
}
} catch (final StreamsException streamsException) {
handleStreamsException(streamsException);
future.completeExceptionally(streamsException);
} catch (final RuntimeException runtimeException) {
handleRuntimeException(runtimeException);
future.completeExceptionally(runtimeException);
}
}
private boolean removeUpdatingTask(final TaskId taskId, final CompletableFuture<RemovedTaskResult> future) {
if (!updatingTasks.containsKey(taskId)) {
return false;
}
final Task task = updatingTasks.get(taskId);
prepareUpdatingTaskForRemoval(task);
updatingTasks.remove(taskId);
if (task.isActive()) {
transitToUpdateStandbysIfOnlyStandbysLeft();
}
log.info((task.isActive() ? "Active" : "Standby")
+ " task " + task.id() + " was removed from the updating tasks.");
future.complete(new RemovedTaskResult(task));
return true;
}
private void prepareUpdatingTaskForRemoval(final Task task) {
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
changelogReader.unregister(changelogPartitions);
}
private boolean removePausedTask(final TaskId taskId, final CompletableFuture<RemovedTaskResult> future) {
if (!pausedTasks.containsKey(taskId)) {
return false;
}
final Task task = pausedTasks.get(taskId);
preparePausedTaskForRemoval(task);
pausedTasks.remove(taskId);
log.info((task.isActive() ? "Active" : "Standby")
+ " task " + task.id() + " was removed from the paused tasks.");
future.complete(new RemovedTaskResult(task));
return true;
}
private void preparePausedTaskForRemoval(final Task task) {
final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
changelogReader.unregister(changelogPartitions);
}
private boolean removeRestoredTask(final TaskId taskId, final CompletableFuture<RemovedTaskResult> future) {
restoredActiveTasksLock.lock();
try {
final Iterator<StreamTask> iterator = restoredActiveTasks.iterator();
while (iterator.hasNext()) {
final StreamTask restoredTask = iterator.next();
if (restoredTask.id().equals(taskId)) {
iterator.remove();
log.info((restoredTask.isActive() ? "Active" : "Standby")
+ " task " + restoredTask.id() + " was removed from the restored tasks.");
future.complete(new RemovedTaskResult(restoredTask));
return true;
}
}
return false;
} finally {
restoredActiveTasksLock.unlock();
}
}
private boolean removeFailedTask(final TaskId taskId, final CompletableFuture<RemovedTaskResult> future) {
exceptionsAndFailedTasksLock.lock();
try {
final Iterator<ExceptionAndTask> iterator = exceptionsAndFailedTasks.iterator();
while (iterator.hasNext()) {
final ExceptionAndTask exceptionAndTask = iterator.next();
final Task failedTask = exceptionAndTask.task();
if (failedTask.id().equals(taskId)) {
iterator.remove();
log.info((failedTask.isActive() ? "Active" : "Standby")
+ " task " + failedTask.id() + " was removed from the failed tasks.");
future.complete(new RemovedTaskResult(failedTask, exceptionAndTask.exception()));
return true;
}
}
return false;
} finally {
exceptionsAndFailedTasksLock.unlock();
}
}
private void removeTask(final TaskId taskId) {
final Task task;
if (updatingTasks.containsKey(taskId)) {
@ -722,8 +833,8 @@ public class DefaultStateUpdater implements StateUpdater {
try {
TaskAndAction taskAndAction;
while ((taskAndAction = tasksAndActions.peek()) != null) {
if (taskAndAction.getAction() == Action.ADD) {
removedTasks.add(taskAndAction.getTask());
if (taskAndAction.action() == Action.ADD) {
removedTasks.add(taskAndAction.task());
}
tasksAndActions.poll();
}
@ -754,6 +865,19 @@ public class DefaultStateUpdater implements StateUpdater {
}
}
@Override
public CompletableFuture<RemovedTaskResult> removeWithFuture(final TaskId taskId) {
final CompletableFuture<RemovedTaskResult> future = new CompletableFuture<>();
tasksAndActionsLock.lock();
try {
tasksAndActions.add(TaskAndAction.createRemoveTask(taskId, future));
tasksAndActionsCondition.signalAll();
} finally {
tasksAndActionsLock.unlock();
}
return future;
}
@Override
public void remove(final TaskId taskId) {
tasksAndActionsLock.lock();
@ -909,6 +1033,10 @@ public class DefaultStateUpdater implements StateUpdater {
return stateUpdaterThread.restoreConsumerInstanceId(timeout);
}
public boolean isRunning() {
return stateUpdaterThread != null && stateUpdaterThread.isRunning.get();
}
// used for testing
boolean isIdle() {
if (stateUpdaterThread != null) {
@ -942,8 +1070,8 @@ public class DefaultStateUpdater implements StateUpdater {
return
Stream.concat(
tasksAndActions.stream()
.filter(taskAndAction -> taskAndAction.getAction() == Action.ADD)
.map(TaskAndAction::getTask),
.filter(taskAndAction -> taskAndAction.action() == Action.ADD)
.map(TaskAndAction::task),
Stream.concat(
getUpdatingTasks().stream(),
Stream.concat(

View File

@ -23,7 +23,9 @@ import org.apache.kafka.streams.processor.TaskId;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public interface StateUpdater {
@ -66,6 +68,50 @@ public interface StateUpdater {
}
}
class RemovedTaskResult {
private final Task task;
private final Optional<RuntimeException> exception;
public RemovedTaskResult(final Task task) {
this(task, null);
}
public RemovedTaskResult(final Task task, final RuntimeException exception) {
this.task = Objects.requireNonNull(task);
this.exception = Optional.ofNullable(exception);
}
public Task task() {
return task;
}
public Optional<RuntimeException> exception() {
return exception;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (!(o instanceof RemovedTaskResult)) return false;
final RemovedTaskResult that = (RemovedTaskResult) o;
return Objects.equals(task.id(), that.task.id()) && Objects.equals(exception, that.exception);
}
@Override
public int hashCode() {
return Objects.hash(task, exception);
}
@Override
public String toString() {
return "RemovedTaskResult{" +
"task=" + task.id() +
", exception=" + exception +
'}';
}
}
/**
* Starts the state updater.
*/
@ -103,6 +149,17 @@ public interface StateUpdater {
*/
void remove(final TaskId taskId);
/**
* Removes a task (active or standby) from the state updater.
*
* This method does not block until the removed task is removed from the state updater. But it returns a future on
* which processing can be blocked. The task to remove is removed from the updating tasks, paused tasks,
* restored tasks, or failed tasks.
*
* @param taskId ID of the task to remove
*/
CompletableFuture<RemovedTaskResult> removeWithFuture(final TaskId taskId);
/**
* Wakes up the state updater if it is currently dormant, to check if a paused task should be resumed.
*/

View File

@ -19,10 +19,11 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.TaskId;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
public class TaskAndAction {
enum Action {
public enum Action {
ADD,
REMOVE
}
@ -30,38 +31,57 @@ public class TaskAndAction {
private final Task task;
private final TaskId taskId;
private final Action action;
private final CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemove;
private TaskAndAction(final Task task, final TaskId taskId, final Action action) {
private TaskAndAction(final Task task,
final TaskId taskId,
final Action action,
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemove) {
this.task = task;
this.taskId = taskId;
this.action = action;
this.futureForRemove = futureForRemove;
}
public static TaskAndAction createAddTask(final Task task) {
Objects.requireNonNull(task, "Task to add is null!");
return new TaskAndAction(task, null, Action.ADD);
return new TaskAndAction(task, null, Action.ADD, null);
}
public static TaskAndAction createRemoveTask(final TaskId taskId,
final CompletableFuture<StateUpdater.RemovedTaskResult> future) {
Objects.requireNonNull(taskId, "Task ID of task to remove is null!");
Objects.requireNonNull(future, "Future for task to remove is null!");
return new TaskAndAction(null, taskId, Action.REMOVE, future);
}
public static TaskAndAction createRemoveTask(final TaskId taskId) {
Objects.requireNonNull(taskId, "Task ID of task to remove is null!");
return new TaskAndAction(null, taskId, Action.REMOVE);
return new TaskAndAction(null, taskId, Action.REMOVE, null);
}
public Task getTask() {
public Task task() {
if (action != Action.ADD) {
throw new IllegalStateException("Action type " + action + " cannot have a task!");
}
return task;
}
public TaskId getTaskId() {
public TaskId taskId() {
if (action != Action.REMOVE) {
throw new IllegalStateException("Action type " + action + " cannot have a task ID!");
}
return taskId;
}
public Action getAction() {
public CompletableFuture<StateUpdater.RemovedTaskResult> futureForRemove() {
if (action != Action.REMOVE) {
throw new IllegalStateException("Action type " + action + " cannot have a future with a single result!");
}
return futureForRemove;
}
public Action action() {
return action;
}
}

View File

@ -36,6 +36,7 @@ import org.mockito.InOrder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
@ -43,6 +44,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -62,6 +66,7 @@ import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyBoolean;
@ -151,7 +156,7 @@ class DefaultStateUpdaterTest {
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
stateUpdater.add(statelessTask);
stateUpdater.add(statefulTask);
stateUpdater.remove(TASK_1_1);
stateUpdater.removeWithFuture(TASK_1_1);
stateUpdater.add(standbyTask);
verifyRemovedTasks();
@ -280,6 +285,7 @@ class DefaultStateUpdaterTest {
stateUpdater.add(task2);
verifyFailedTasks(IllegalStateException.class, task1);
assertFalse(stateUpdater.isRunning());
}
@Test
@ -306,7 +312,6 @@ class DefaultStateUpdaterTest {
verifyNeverCheckpointTasks(tasks);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
}
@ -330,7 +335,6 @@ class DefaultStateUpdaterTest {
verifyCheckpointTasks(true, task);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
verify(changelogReader).register(task.changelogPartitions(), task.stateManager());
verify(changelogReader).unregister(task.changelogPartitions());
@ -344,16 +348,16 @@ class DefaultStateUpdaterTest {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
final AtomicBoolean allChangelogCompleted = new AtomicBoolean(false);
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_C_0))
.thenReturn(mkSet(TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0))
.thenReturn(mkSet(TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted())
.thenReturn(false)
.thenReturn(false)
.thenReturn(false)
.thenReturn(true);
.thenAnswer(invocation -> {
allChangelogCompleted.set(true);
return mkSet(TOPIC_PARTITION_C_0, TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0);
});
when(changelogReader.allChangelogsCompleted()).thenReturn(allChangelogCompleted.get());
stateUpdater.start();
stateUpdater.add(task1);
@ -364,7 +368,6 @@ class DefaultStateUpdaterTest {
verifyCheckpointTasks(true, task3, task1, task2);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
verify(changelogReader).register(task1.changelogPartitions(), task1.stateManager());
verify(changelogReader).register(task2.changelogPartitions(), task2.stateManager());
@ -399,7 +402,6 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks();
verifyUpdatingTasks(task);
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
assertTrue(stateUpdater.restoresActiveTasks());
@ -418,14 +420,13 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks(task);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
assertTrue(stateUpdater.restoresActiveTasks());
}
@Test
public void shouldReturnTrueForRestoreActiveTasksIfTaskRemoved() throws Exception {
public void shouldReturnFalseForRestoreActiveTasksIfTaskRemoved() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
.inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
@ -434,14 +435,13 @@ class DefaultStateUpdaterTest {
.thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.remove(task.id());
stateUpdater.removeWithFuture(task.id()).get();
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks(task);
verifyPausedTasks();
assertTrue(stateUpdater.restoresActiveTasks());
assertFalse(stateUpdater.restoresActiveTasks());
}
@Test
@ -459,7 +459,6 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, task));
verifyRemovedTasks();
verifyPausedTasks();
assertTrue(stateUpdater.restoresActiveTasks());
@ -480,7 +479,6 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks(task);
assertTrue(stateUpdater.restoresActiveTasks());
@ -500,7 +498,6 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
assertFalse(stateUpdater.restoresActiveTasks());
@ -519,7 +516,6 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks();
verifyUpdatingTasks(task);
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
assertFalse(stateUpdater.restoresActiveTasks());
@ -572,7 +568,6 @@ class DefaultStateUpdaterTest {
verifyUpdatingStandbyTasks(tasks);
verifyRestoredActiveTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
for (final StandbyTask task : tasks) {
verify(changelogReader).register(task.changelogPartitions(), task.stateManager());
@ -604,7 +599,6 @@ class DefaultStateUpdaterTest {
verifyCheckpointTasks(true, task2, task1);
verifyUpdatingStandbyTasks(task4, task3);
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
verifyPausedTasks();
verify(changelogReader).register(task1.changelogPartitions(), task1.stateManager());
verify(changelogReader).register(task2.changelogPartitions(), task2.stateManager());
@ -673,14 +667,14 @@ class DefaultStateUpdaterTest {
orderVerifier.verify(changelogReader, atLeast(1)).enforceRestoreActive();
orderVerifier.verify(changelogReader).transitToUpdateStandby();
}
@Test
public void shouldNotTransitToStandbyAgainAfterStandbyTaskFailed() throws Exception {
final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
final StandbyTask task2 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task1.id(), task1),
mkEntry(task2.id(), task2)
mkEntry(task1.id(), task1),
mkEntry(task2.id(), task2)
);
final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(task1.id()));
final ExceptionAndTask expectedExceptionAndTasks = new ExceptionAndTask(taskCorruptedException, task1);
@ -708,10 +702,10 @@ class DefaultStateUpdaterTest {
stateUpdater.add(standbyTask);
verifyUpdatingTasks(activeTask1, activeTask2, standbyTask);
stateUpdater.remove(activeTask1.id());
stateUpdater.remove(activeTask2.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future1 = stateUpdater.removeWithFuture(activeTask1.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future2 = stateUpdater.removeWithFuture(activeTask2.id());
CompletableFuture.allOf(future1, future2).get();
verifyRemovedTasks(activeTask1, activeTask2);
final InOrder orderVerifier = inOrder(changelogReader);
orderVerifier.verify(changelogReader, atLeast(1)).enforceRestoreActive();
orderVerifier.verify(changelogReader).transitToUpdateStandby();
@ -728,33 +722,33 @@ class DefaultStateUpdaterTest {
stateUpdater.add(standbyTask2);
verifyUpdatingTasks(standbyTask1, standbyTask2);
stateUpdater.remove(standbyTask2.id());
stateUpdater.removeWithFuture(standbyTask2.id()).get();
verifyRemovedTasks(standbyTask2);
verify(changelogReader).transitToUpdateStandby();
}
@Test
public void shouldRemoveActiveStatefulTask() throws Exception {
public void shouldRemoveUpdatingActiveStatefulTask() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldRemoveStatefulTask(task);
shouldRemoveUpdatingStatefulTask(task);
}
@Test
public void shouldRemoveStandbyTask() throws Exception {
public void shouldRemoveUpdatingStandbyTask() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldRemoveStatefulTask(task);
shouldRemoveUpdatingStatefulTask(task);
}
private void shouldRemoveStatefulTask(final Task task) throws Exception {
private void shouldRemoveUpdatingStatefulTask(final Task task) throws Exception {
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
verifyUpdatingTasks(task);
stateUpdater.remove(task.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
verifyRemovedTasks(task);
assertEquals(new StateUpdater.RemovedTaskResult(task), future.get());
verifyCheckpointTasks(true, task);
verifyRestoredActiveTasks();
verifyUpdatingTasks();
@ -764,104 +758,227 @@ class DefaultStateUpdaterTest {
}
@Test
public void shouldRemovePausedTask() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
public void shouldThrowIfRemovingUpdatingActiveTaskFailsWithStreamsException() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamsException streamsException = new StreamsException("Something happened", task.id());
setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(task, streamsException);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
verifyRemovingUpdatingStatefulTaskFails(future, task, streamsException, true);
}
@Test
public void shouldThrowIfRemovingUpdatingActiveTaskFailsWithRuntimeException() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final RuntimeException runtimeException = new RuntimeException("Something happened");
setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(task, runtimeException);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
verifyRemovingUpdatingStatefulTaskFails(future, task, runtimeException, false);
}
@Test
public void shouldThrowIfRemovingUpdatingStandbyTaskFailsWithStreamsException() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
final StreamsException streamsException = new StreamsException("Something happened", task.id());
setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(task, streamsException);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
verifyRemovingUpdatingStatefulTaskFails(future, task, streamsException, true);
}
@Test
public void shouldThrowIfRemovingUpdatingStandbyTaskFailsWithRuntimeException() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
final RuntimeException runtimeException = new RuntimeException("Something happened");
setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(task, runtimeException);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
verifyRemovingUpdatingStatefulTaskFails(future, task, runtimeException, false);
}
private void setupShouldThrowIfRemovingUpdatingStatefulTaskFailsWithException(final Task task,
final RuntimeException exception) throws Exception {
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
doThrow(exception).when(changelogReader).unregister(changelogPartitions);
stateUpdater.start();
stateUpdater.add(task1);
stateUpdater.add(task2);
verifyUpdatingTasks(task1, task2);
stateUpdater.add(task);
verifyUpdatingTasks(task);
}
private void verifyRemovingUpdatingStatefulTaskFails(final CompletableFuture<StateUpdater.RemovedTaskResult> future,
final Task task,
final RuntimeException exception,
final boolean shouldStillBeRunning) throws Exception {
final ExecutionException executionException = assertThrows(ExecutionException.class, future::get);
assertInstanceOf(RuntimeException.class, executionException.getCause());
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyPausedTasks();
verifyExceptionsAndFailedTasks(new ExceptionAndTask(exception, task));
assertEquals(shouldStillBeRunning, stateUpdater.isRunning());
}
@Test
public void shouldRemovePausedTask() throws Exception {
final StreamTask statefulTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
stateUpdater.start();
stateUpdater.add(statefulTask);
stateUpdater.add(standbyTask);
verifyUpdatingTasks(statefulTask, standbyTask);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(task1, task2);
verifyRemovedTasks();
verifyPausedTasks(statefulTask, standbyTask);
verifyUpdatingTasks();
stateUpdater.remove(task1.id());
stateUpdater.remove(task2.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> futureOfStatefulTask = stateUpdater.removeWithFuture(statefulTask.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> futureOfStandbyTask = stateUpdater.removeWithFuture(standbyTask.id());
verifyRemovedTasks(task1, task2);
assertEquals(new StateUpdater.RemovedTaskResult(statefulTask), futureOfStatefulTask.get());
assertEquals(new StateUpdater.RemovedTaskResult(standbyTask), futureOfStandbyTask.get());
verifyPausedTasks();
verifyCheckpointTasks(true, task1, task2);
verifyCheckpointTasks(true, statefulTask, standbyTask);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verify(changelogReader).unregister(task1.changelogPartitions());
verify(changelogReader).unregister(task2.changelogPartitions());
verify(changelogReader).unregister(statefulTask.changelogPartitions());
verify(changelogReader).unregister(standbyTask.changelogPartitions());
}
@Test
public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
public void shouldThrowIfRemovingPausedTaskFails() throws Exception {
final StreamTask statefulTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamsException streamsException = new StreamsException("Something happened", statefulTask.id());
final Collection<TopicPartition> changelogPartitions = statefulTask.changelogPartitions();
doThrow(streamsException).when(changelogReader).unregister(changelogPartitions);
stateUpdater.start();
stateUpdater.add(statefulTask);
verifyUpdatingTasks(statefulTask);
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyPausedTasks(statefulTask);
verifyUpdatingTasks();
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(statefulTask.id());
final ExecutionException executionException = assertThrows(ExecutionException.class, future::get);
assertInstanceOf(StreamsException.class, executionException.getCause());
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyPausedTasks();
verifyExceptionsAndFailedTasks(new ExceptionAndTask(streamsException, statefulTask));
assertTrue(stateUpdater.isRunning());
}
@Test
public void shouldRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotRemoveTaskFromRestoredActiveTasks(task);
shouldRemoveTaskFromRestoredActiveTasks(task);
}
@Test
public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
public void shouldRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
final StreamTask task = statelessTask(TASK_0_0).inState(State.RESTORING).build();
shouldNotRemoveTaskFromRestoredActiveTasks(task);
shouldRemoveTaskFromRestoredActiveTasks(task);
}
private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception {
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
private void shouldRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception {
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.add(controlTask);
verifyRestoredActiveTasks(task);
stateUpdater.remove(task.id());
stateUpdater.remove(controlTask.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
future.get();
verifyRemovedTasks(controlTask);
verifyRestoredActiveTasks(task);
assertEquals(new StateUpdater.RemovedTaskResult(task), future.get());
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyPausedTasks();
verifyExceptionsAndFailedTasks();
}
@Test
public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
public void shouldRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotRemoveTaskFromFailedTasks(task);
shouldRemoveTaskFromFailedTasks(task);
}
@Test
public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
public void shouldRemoveStandbyTaskFromFailedTasks() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotRemoveTaskFromFailedTasks(task);
shouldRemoveTaskFromFailedTasks(task);
}
private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exception {
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
private void shouldRemoveTaskFromFailedTasks(final Task task) throws Exception {
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task.id(), task),
mkEntry(controlTask.id(), controlTask)
);
final Map<TaskId, Task> updatingTasks = mkMap(mkEntry(task.id(), task));
doThrow(streamsException)
.doReturn(0L)
.when(changelogReader).restore(updatingTasks);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.add(controlTask);
final ExceptionAndTask expectedExceptionAndTasks = new ExceptionAndTask(streamsException, task);
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
stateUpdater.remove(task.id());
stateUpdater.remove(controlTask.id());
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(task.id());
verifyRemovedTasks(controlTask);
assertEquals(new StateUpdater.RemovedTaskResult(task, streamsException), future.get());
verifyPausedTasks();
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
verifyExceptionsAndFailedTasks();
verifyUpdatingTasks();
verifyRestoredActiveTasks();
}
@Test
public void shouldCompleteWithNullIfTaskNotFound() throws Exception {
final StreamTask updatingTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask restoredTask = statefulTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamTask failedTask = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(TASK_0_2));
doThrow(taskCorruptedException).when(changelogReader).restore(mkMap(
mkEntry(TASK_0_0, updatingTask),
mkEntry(TASK_0_2, failedTask)
));
when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(updatingTask);
stateUpdater.add(restoredTask);
stateUpdater.add(failedTask);
verifyRestoredActiveTasks(restoredTask);
verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, failedTask));
verifyUpdatingTasks(updatingTask);
verifyPausedTasks();
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(TASK_1_0);
assertNull(future.get());
verifyRestoredActiveTasks(restoredTask);
verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, failedTask));
verifyUpdatingTasks(updatingTask);
verifyPausedTasks();
}
@Test
public void shouldCompleteWithNullIfNoTasks() throws Exception {
stateUpdater.start();
final CompletableFuture<StateUpdater.RemovedTaskResult> future = stateUpdater.removeWithFuture(TASK_0_1);
assertNull(future.get());
assertTrue(stateUpdater.isRunning());
}
@Test
public void shouldPauseActiveStatefulTask() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
@ -890,7 +1007,6 @@ class DefaultStateUpdaterTest {
verifyPausedTasks(task1);
verifyCheckpointTasks(true, task1);
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyUpdatingTasks(task2);
verifyExceptionsAndFailedTasks();
verify(changelogReader, times(1)).enforceRestoreActive();
@ -924,7 +1040,6 @@ class DefaultStateUpdaterTest {
verifyPausedTasks(task);
verifyCheckpointTasks(true, task);
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
}
@ -936,7 +1051,6 @@ class DefaultStateUpdaterTest {
verifyPausedTasks();
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
}
@ -1003,40 +1117,6 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks();
}
@Test
public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
@Test
public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
private void shouldNotPauseTaskInRemovedTasks(final Task task) throws Exception {
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
stateUpdater.remove(task.id());
verifyRemovedTasks(task);
verifyCheckpointTasks(true, task);
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyPausedTasks();
verifyExceptionsAndFailedTasks();
when(topologyMetadata.isPaused(null)).thenReturn(true);
verifyRemovedTasks(task);
verifyUpdatingTasks();
verifyPausedTasks();
}
@Test
public void shouldResumeActiveStatefulTask() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
@ -1092,7 +1172,6 @@ class DefaultStateUpdaterTest {
verifyPausedTasks();
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
}
@ -1115,35 +1194,6 @@ class DefaultStateUpdaterTest {
verifyExceptionsAndFailedTasks();
}
@Test
public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
@Test
public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception {
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotResumeTaskInRemovedTasks(task);
}
private void shouldNotResumeTaskInRemovedTasks(final Task task) throws Exception {
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(task);
verifyUpdatingTasks(task);
verifyExceptionsAndFailedTasks();
stateUpdater.remove(task.id());
verifyRemovedTasks(task);
verifyUpdatingTasks();
verifyUpdatingTasks();
}
@Test
public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception {
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
@ -1162,12 +1212,12 @@ class DefaultStateUpdaterTest {
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task.id(), task),
mkEntry(controlTask.id(), controlTask)
mkEntry(task.id(), task),
mkEntry(controlTask.id(), controlTask)
);
doThrow(streamsException)
.doReturn(0L)
.when(changelogReader).restore(updatingTasks);
.doReturn(0L)
.when(changelogReader).restore(updatingTasks);
stateUpdater.start();
stateUpdater.add(task);
@ -1180,33 +1230,6 @@ class DefaultStateUpdaterTest {
verifyUpdatingTasks(controlTask);
}
@Test
public void shouldDrainRemovedTasks() throws Exception {
assertFalse(stateUpdater.hasRemovedTasks());
assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
stateUpdater.add(task1);
stateUpdater.remove(task1.id());
verifyDrainingRemovedTasks(task1);
final StreamTask task2 = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task4 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)).inState(State.RESTORING).build();
stateUpdater.add(task2);
stateUpdater.remove(task2.id());
stateUpdater.add(task3);
stateUpdater.remove(task3.id());
stateUpdater.add(task4);
stateUpdater.remove(task4.id());
verifyDrainingRemovedTasks(task2, task3, task4);
}
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
@ -1226,10 +1249,10 @@ class DefaultStateUpdaterTest {
final ExceptionAndTask expectedExceptionAndTask1 = new ExceptionAndTask(streamsException, task1);
final ExceptionAndTask expectedExceptionAndTask2 = new ExceptionAndTask(streamsException, task2);
verifyExceptionsAndFailedTasks(expectedExceptionAndTask1, expectedExceptionAndTask2);
verifyRemovedTasks();
verifyPausedTasks();
verifyUpdatingTasks();
verifyRestoredActiveTasks();
assertTrue(stateUpdater.isRunning());
}
@Test
@ -1266,8 +1289,8 @@ class DefaultStateUpdaterTest {
verifyExceptionsAndFailedTasks(expectedExceptionAndTasks1, expectedExceptionAndTasks2);
verifyUpdatingTasks(task2);
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyPausedTasks();
assertTrue(stateUpdater.isRunning());
}
@Test
@ -1294,10 +1317,10 @@ class DefaultStateUpdaterTest {
verifyExceptionsAndFailedTasks(expectedExceptionAndTask1, expectedExceptionAndTask2);
verifyUpdatingTasks(task3);
verifyRestoredActiveTasks();
verifyRemovedTasks();
verify(changelogReader).unregister(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
verify(task1).markChangelogAsCorrupted(mkSet(TOPIC_PARTITION_A_0));
verify(task2).markChangelogAsCorrupted(mkSet(TOPIC_PARTITION_B_0));
assertTrue(stateUpdater.isRunning());
}
@Test
@ -1320,8 +1343,8 @@ class DefaultStateUpdaterTest {
verifyExceptionsAndFailedTasks(expectedExceptionAndTask1, expectedExceptionAndTask2);
verifyUpdatingTasks();
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyPausedTasks();
assertFalse(stateUpdater.isRunning());
}
@Test
@ -1447,7 +1470,7 @@ class DefaultStateUpdaterTest {
stateUpdater.add(activeTask1);
stateUpdater.add(standbyTask1);
stateUpdater.add(standbyTask2);
stateUpdater.remove(TASK_0_0);
stateUpdater.removeWithFuture(TASK_0_0);
stateUpdater.add(activeTask2);
stateUpdater.add(standbyTask3);
@ -1526,29 +1549,6 @@ class DefaultStateUpdaterTest {
verifyGetTasks(mkSet(), mkSet());
}
@Test
public void shouldGetTasksFromRemovedTasks() throws Exception {
final StreamTask activeTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask1 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
stateUpdater.add(standbyTask1);
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask2);
stateUpdater.remove(standbyTask1.id());
stateUpdater.remove(standbyTask2.id());
stateUpdater.remove(activeTask.id());
verifyRemovedTasks(activeTask, standbyTask1, standbyTask2);
verifyGetTasks(mkSet(activeTask), mkSet(standbyTask1, standbyTask2));
stateUpdater.drainRemovedTasks();
verifyGetTasks(mkSet(), mkSet());
}
@Test
public void shouldGetTasksFromPausedTasks() throws Exception {
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
@ -1769,28 +1769,6 @@ class DefaultStateUpdaterTest {
);
}
private void verifyRemovedTasks(final Task... tasks) throws Exception {
if (tasks.length == 0) {
waitForCondition(
() -> stateUpdater.getRemovedTasks().isEmpty(),
VERIFICATION_TIMEOUT,
"Did not get empty removed task within the given timeout!"
);
} else {
final Set<Task> expectedRemovedTasks = mkSet(tasks);
final Set<Task> removedTasks = new HashSet<>();
waitForCondition(
() -> {
removedTasks.addAll(stateUpdater.getRemovedTasks());
return removedTasks.containsAll(expectedRemovedTasks)
&& removedTasks.size() == expectedRemovedTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all removed task within the given timeout!"
);
}
}
private void verifyIdle() throws Exception {
waitForCondition(
() -> stateUpdater.isIdle(),
@ -1821,26 +1799,6 @@ class DefaultStateUpdaterTest {
}
}
private void verifyDrainingRemovedTasks(final Task... tasks) throws Exception {
final Set<Task> expectedRemovedTasks = mkSet(tasks);
final Set<Task> removedTasks = new HashSet<>();
waitForCondition(
() -> {
if (stateUpdater.hasRemovedTasks()) {
final Set<Task> drainedTasks = stateUpdater.drainRemovedTasks();
assertFalse(drainedTasks.isEmpty());
removedTasks.addAll(drainedTasks);
}
return removedTasks.containsAll(mkSet(tasks))
&& removedTasks.size() == expectedRemovedTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all restored active task within the given timeout!"
);
assertFalse(stateUpdater.hasRemovedTasks());
assertTrue(stateUpdater.drainRemovedTasks().isEmpty());
}
private void verifyExceptionsAndFailedTasks(final ExceptionAndTask... exceptionsAndTasks) throws Exception {
final List<ExceptionAndTask> expectedExceptionAndTasks = Arrays.asList(exceptionsAndTasks);
final Set<ExceptionAndTask> failedTasks = new HashSet<>();
@ -1892,4 +1850,26 @@ class DefaultStateUpdaterTest {
assertFalse(stateUpdater.hasExceptionsAndFailedTasks());
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
}
private void verifyRemovedTasks(final Task... tasks) throws Exception {
if (tasks.length == 0) {
waitForCondition(
() -> stateUpdater.getRemovedTasks().isEmpty(),
VERIFICATION_TIMEOUT,
"Did not get empty removed task within the given timeout!"
);
} else {
final Set<Task> expectedRemovedTasks = mkSet(tasks);
final Set<Task> removedTasks = new HashSet<>();
waitForCondition(
() -> {
removedTasks.addAll(stateUpdater.getRemovedTasks());
return removedTasks.containsAll(expectedRemovedTasks)
&& removedTasks.size() == expectedRemovedTasks.size();
},
VERIFICATION_TIMEOUT,
"Did not get all removed task within the given timeout!"
);
}
}
}

View File

@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.TaskId;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.ADD;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.Action.REMOVE;
import static org.apache.kafka.streams.processor.internals.TaskAndAction.createAddTask;
@ -36,22 +38,26 @@ class TaskAndActionTest {
final TaskAndAction addTask = createAddTask(task);
assertEquals(ADD, addTask.getAction());
assertEquals(task, addTask.getTask());
final Exception exception = assertThrows(IllegalStateException.class, addTask::getTaskId);
assertEquals("Action type ADD cannot have a task ID!", exception.getMessage());
assertEquals(ADD, addTask.action());
assertEquals(task, addTask.task());
final Exception exceptionForTaskId = assertThrows(IllegalStateException.class, addTask::taskId);
assertEquals("Action type ADD cannot have a task ID!", exceptionForTaskId.getMessage());
final Exception exceptionForFutureForRemove = assertThrows(IllegalStateException.class, addTask::futureForRemove);
assertEquals("Action type ADD cannot have a future with a single result!", exceptionForFutureForRemove.getMessage());
}
@Test
public void shouldCreateRemoveTaskAction() {
final TaskId taskId = new TaskId(0, 0);
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
final TaskAndAction removeTask = createRemoveTask(taskId);
final TaskAndAction removeTask = createRemoveTask(taskId, future);
assertEquals(REMOVE, removeTask.getAction());
assertEquals(taskId, removeTask.getTaskId());
final Exception exception = assertThrows(IllegalStateException.class, removeTask::getTask);
assertEquals("Action type REMOVE cannot have a task!", exception.getMessage());
assertEquals(REMOVE, removeTask.action());
assertEquals(taskId, removeTask.taskId());
assertEquals(future, removeTask.futureForRemove());
final Exception exceptionForTask = assertThrows(IllegalStateException.class, removeTask::task);
assertEquals("Action type REMOVE cannot have a task!", exceptionForTask.getMessage());
}
@Test
@ -62,7 +68,19 @@ class TaskAndActionTest {
@Test
public void shouldThrowIfRemoveTaskActionIsCreatedWithNullTaskId() {
final Exception exception = assertThrows(NullPointerException.class, () -> createRemoveTask(null));
final Exception exception = assertThrows(
NullPointerException.class,
() -> createRemoveTask(null, new CompletableFuture<>())
);
assertTrue(exception.getMessage().contains("Task ID of task to remove is null!"));
}
@Test
public void shouldThrowIfRemoveTaskActionIsCreatedWithNullFuture() {
final Exception exception = assertThrows(
NullPointerException.class,
() -> createRemoveTask(new TaskId(0, 0), null)
);
assertTrue(exception.getMessage().contains("Future for task to remove is null!"));
}
}