MINOR: Use builder for mock task in DefaultStateUpdaterTest (#12436)

Reviewer: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Bruno Cadonna 2022-07-26 10:12:20 +02:00 committed by GitHub
parent a450fb70c1
commit f191e4781e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 125 additions and 170 deletions

View File

@ -45,6 +45,9 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.easymock.EasyMock.anyBoolean;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -123,12 +126,12 @@ class DefaultStateUpdaterTest {
@Test
public void shouldThrowIfStatelessTaskNotInStateRestoring() {
shouldThrowIfActiveTaskNotInStateRestoring(createStatelessTask(TASK_0_0));
shouldThrowIfActiveTaskNotInStateRestoring(statelessTask(TASK_0_0).build());
}
@Test
public void shouldThrowIfStatefulTaskNotInStateRestoring() {
shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)));
shouldThrowIfActiveTaskNotInStateRestoring(statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).build());
}
private void shouldThrowIfActiveTaskNotInStateRestoring(final StreamTask task) {
@ -137,7 +140,7 @@ class DefaultStateUpdaterTest {
@Test
public void shouldThrowIfStandbyTaskNotInStateRunning() {
final StandbyTask task = createStandbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).build();
shouldThrowIfTaskNotInGivenState(task, State.RUNNING);
}
@ -152,29 +155,29 @@ class DefaultStateUpdaterTest {
@Test
public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
shouldThrowIfAddingTasksWithSameId(task2, task1);
}
@ -188,15 +191,15 @@ class DefaultStateUpdaterTest {
@Test
public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() throws Exception {
final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0);
final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
shouldImmediatelyAddStatelessTasksToRestoredTasks(task1);
}
@Test
public void shouldImmediatelyAddMultipleStatelessTasksToRestoredTasks() throws Exception {
final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0);
final StreamTask task2 = createStatelessTaskInStateRestoring(TASK_0_2);
final StreamTask task3 = createStatelessTaskInStateRestoring(TASK_1_0);
final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
final StreamTask task2 = statelessTask(TASK_0_2).inState(State.RESTORING).build();
final StreamTask task3 = statelessTask(TASK_1_0).inState(State.RESTORING).build();
shouldImmediatelyAddStatelessTasksToRestoredTasks(task1, task2, task3);
}
@ -217,7 +220,7 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreSingleActiveStatefulTask() throws Exception {
final StreamTask task =
createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
@ -243,9 +246,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreMultipleActiveStatefulTasks() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_C_0))
@ -277,15 +280,15 @@ class DefaultStateUpdaterTest {
public void shouldDrainRestoredActiveTasks() throws Exception {
assertTrue(stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0);
final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
stateUpdater.start();
stateUpdater.add(task1);
verifyDrainingRestoredActiveTasks(task1);
final StreamTask task2 = createStatelessTaskInStateRestoring(TASK_1_1);
final StreamTask task3 = createStatelessTaskInStateRestoring(TASK_1_0);
final StreamTask task4 = createStatelessTaskInStateRestoring(TASK_0_2);
final StreamTask task2 = statelessTask(TASK_1_1).inState(State.RESTORING).build();
final StreamTask task3 = statelessTask(TASK_1_0).inState(State.RESTORING).build();
final StreamTask task4 = statelessTask(TASK_0_2).inState(State.RESTORING).build();
stateUpdater.add(task2);
stateUpdater.add(task3);
stateUpdater.add(task4);
@ -295,18 +298,16 @@ class DefaultStateUpdaterTest {
@Test
public void shouldUpdateSingleStandbyTask() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(
TASK_0_0,
mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)
);
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
.inState(State.RUNNING).build();
shouldUpdateStandbyTasks(task);
}
@Test
public void shouldUpdateMultipleStandbyTasks() throws Exception {
final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
shouldUpdateStandbyTasks(task1, task2, task3);
}
@ -331,10 +332,10 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
@ -361,9 +362,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StreamTask task3 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
@ -391,9 +392,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception {
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask activeTask2 = statefulTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final TaskCorruptedException taskCorruptedException =
new TaskCorruptedException(mkSet(activeTask1.id(), activeTask2.id()));
final Map<TaskId, Task> updatingTasks1 = mkMap(
@ -419,9 +420,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception {
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask activeTask2 = statefulTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -441,13 +442,13 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRemoveActiveStatefulTask() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldRemoveStatefulTask(task);
}
@Test
public void shouldRemoveStandbyTask() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldRemoveStatefulTask(task);
}
@ -470,8 +471,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRemovePausedTask() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
stateUpdater.start();
stateUpdater.add(task1);
@ -496,18 +497,18 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotRemoveTaskFromRestoredActiveTasks(task);
}
@Test
public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
final StreamTask task = statelessTask(TASK_0_0).inState(State.RESTORING).build();
shouldNotRemoveTaskFromRestoredActiveTasks(task);
}
private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception {
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -527,18 +528,18 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotRemoveTaskFromFailedTasks(task);
}
@Test
public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotRemoveTaskFromFailedTasks(task);
}
private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exception {
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@ -568,22 +569,22 @@ class DefaultStateUpdaterTest {
@Test
public void shouldPauseActiveStatefulTask() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldPauseStatefulTask(task);
verify(changelogReader, never()).transitToUpdateStandby();
}
@Test
public void shouldPauseStandbyTask() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldPauseStatefulTask(task);
verify(changelogReader, times(1)).transitToUpdateStandby();
}
@Test
public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
stateUpdater.start();
stateUpdater.add(task1);
@ -629,8 +630,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -649,18 +650,18 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInFailedTasks(task);
}
@Test
public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotPauseTaskInFailedTasks(task);
}
private void shouldNotPauseTaskInFailedTasks(final Task task) throws Exception {
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@ -689,13 +690,13 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
@Test
public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
@ -724,14 +725,14 @@ class DefaultStateUpdaterTest {
@Test
public void shouldResumeActiveStatefulTask() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldResumeStatefulTask(task);
verify(changelogReader, times(2)).enforceRestoreActive();
}
@Test
public void shouldResumeStandbyTask() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldResumeStatefulTask(task);
verify(changelogReader, times(2)).transitToUpdateStandby();
}
@ -765,8 +766,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -786,13 +787,13 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
@Test
public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotResumeTaskInRemovedTasks(task);
}
@ -817,18 +818,18 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInFailedTasks(task);
}
@Test
public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotResumeTaskInFailedTasks(task);
}
private void shouldNotResumeTaskInFailedTasks(final Task task) throws Exception {
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@ -861,15 +862,15 @@ class DefaultStateUpdaterTest {
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
stateUpdater.add(task1);
stateUpdater.remove(task1.id());
verifyDrainingRemovedTasks(task1);
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0));
final StreamTask task2 = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task4 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)).inState(State.RESTORING).build();
stateUpdater.add(task2);
stateUpdater.remove(task2.id());
stateUpdater.add(task3);
@ -882,8 +883,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException = new StreamsException(exceptionMessage);
final Map<TaskId, Task> updatingTasks = mkMap(
@ -906,9 +907,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id());
final StreamsException streamsException2 = new StreamsException(exceptionMessage, task3.id());
@ -944,9 +945,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
final Set<TaskId> expectedTaskIds = mkSet(task1.id(), task2.id());
final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(expectedTaskIds);
final Map<TaskId, Task> updatingTasks = mkMap(
@ -970,8 +971,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final IllegalStateException illegalStateException = new IllegalStateException("Nobody expects the Spanish inquisition!");
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task1.id(), task1),
@ -995,10 +996,10 @@ class DefaultStateUpdaterTest {
public void shouldDrainFailedTasksAndExceptions() throws Exception {
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task4 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)).inState(State.RESTORING).build();
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id());
final Map<TaskId, Task> updatingTasks1 = mkMap(
@ -1043,10 +1044,10 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAutoCheckpointTasksOnInterval() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -1069,10 +1070,10 @@ class DefaultStateUpdaterTest {
final Time time = new MockTime();
final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
try {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -1105,11 +1106,11 @@ class DefaultStateUpdaterTest {
public void shouldGetTasksFromInputQueue() {
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask1 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask3 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
stateUpdater.add(activeTask1);
stateUpdater.add(standbyTask1);
stateUpdater.add(standbyTask2);
@ -1135,11 +1136,11 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromUpdatingTasks() throws Exception {
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask1 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask3 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -1168,8 +1169,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromRestoredActiveTasks() throws Exception {
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -1186,9 +1187,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromExceptionsAndFailedTasks() throws Exception {
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
final StreamTask activeTask1 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask1 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
final TaskCorruptedException taskCorruptedException =
new TaskCorruptedException(mkSet(standbyTask1.id(), standbyTask2.id()));
final StreamsException streamsException = new StreamsException("The Streams were crossed!", activeTask1.id());
@ -1220,9 +1221,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromRemovedTasks() throws Exception {
final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
final StreamTask activeTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
final StandbyTask standbyTask1 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -1243,8 +1244,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromPausedTasks() throws Exception {
final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_0));
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
stateUpdater.start();
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask);
@ -1453,55 +1454,4 @@ class DefaultStateUpdaterTest {
);
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
}
private StreamTask createActiveStatefulTaskInStateRestoring(final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
final StreamTask task = createActiveStatefulTask(taskId, changelogPartitions);
when(task.state()).thenReturn(State.RESTORING);
return task;
}
private StreamTask createActiveStatefulTask(final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
final StreamTask task = mock(StreamTask.class);
setupStatefulTask(task, taskId, changelogPartitions);
when(task.isActive()).thenReturn(true);
return task;
}
private StreamTask createStatelessTaskInStateRestoring(final TaskId taskId) {
final StreamTask task = createStatelessTask(taskId);
when(task.state()).thenReturn(State.RESTORING);
return task;
}
private StreamTask createStatelessTask(final TaskId taskId) {
final StreamTask task = mock(StreamTask.class);
when(task.changelogPartitions()).thenReturn(Collections.emptySet());
when(task.isActive()).thenReturn(true);
when(task.id()).thenReturn(taskId);
return task;
}
private StandbyTask createStandbyTaskInStateRunning(final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
final StandbyTask task = createStandbyTask(taskId, changelogPartitions);
when(task.state()).thenReturn(State.RUNNING);
return task;
}
private StandbyTask createStandbyTask(final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
final StandbyTask task = mock(StandbyTask.class);
setupStatefulTask(task, taskId, changelogPartitions);
when(task.isActive()).thenReturn(false);
return task;
}
private void setupStatefulTask(final Task task,
final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
when(task.changelogPartitions()).thenReturn(changelogPartitions);
when(task.id()).thenReturn(taskId);
}
}

View File

@ -343,6 +343,11 @@ public final class StreamsTestUtils {
when(task.id()).thenReturn(taskId);
}
public TaskBuilder<T> inState(final Task.State state) {
when(task.state()).thenReturn(state);
return this;
}
public T build() {
return task;
}