From 5a52601691478857472333d8a13a07b09a7d13c5 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Thu, 21 Jul 2022 21:37:17 +0200 Subject: [PATCH] KAFKA-10199: Add tasks to state updater when they are created (#12427) This PR introduces an internal config to enable the state updater. If the state updater is enabled newly created tasks are added to the state updater. Additionally, this PR introduces a builder for mocks for tasks. Reviewers: Guozhang Wang --- .../apache/kafka/streams/StreamsConfig.java | 3 + .../processor/internals/AbstractTask.java | 2 +- .../internals/ProcessorStateManager.java | 3 +- .../processor/internals/StreamThread.java | 4 +- .../streams/processor/internals/Task.java | 2 +- .../processor/internals/TaskManager.java | 18 +- .../streams/processor/internals/Tasks.java | 25 +- .../internals/DefaultStateUpdaterTest.java | 223 +++++++++--------- .../processor/internals/StreamThreadTest.java | 6 +- .../processor/internals/TaskManagerTest.java | 27 ++- .../processor/internals/TasksTest.java | 133 +++++++++++ .../apache/kafka/test/StreamsTestUtils.java | 74 ++++++ 12 files changed, 381 insertions(+), 139 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 9975bd4680d..8976a0983fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1082,6 +1082,9 @@ public class StreamsConfig extends AbstractConfig { // Private API used to control the prefix of the auto created topics public static final String TOPIC_PREFIX_ALTERNATIVE = "__internal.override.topic.prefix__"; + // Private API to enable the state updater (i.e. state updating on a dedicated thread) + public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__"; + public static boolean getBoolean(final Map configs, final String key, final boolean defaultValue) { final Object value = configs.getOrDefault(key, defaultValue); if (value instanceof Boolean) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index f8476b3e8b1..8ffbf9cd2ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -112,7 +112,7 @@ public abstract class AbstractTask implements Task { } @Override - public Collection changelogPartitions() { + public Set changelogPartitions() { return stateMgr.changelogPartitions(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 6efd3124ef6..2a67a568a21 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -45,6 +45,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static java.lang.String.format; @@ -364,7 +365,7 @@ public class ProcessorStateManager implements StateManager { } } - Collection changelogPartitions() { + Set changelogPartitions() { return changelogOffsets().keySet(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 74a49a7e8e7..ee9c67920a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -384,6 +384,7 @@ public class StreamThread extends Thread { threadId, log ); + final TaskManager taskManager = new TaskManager( time, changelogReader, @@ -393,7 +394,8 @@ public class StreamThread extends Thread { standbyTaskCreator, topologyMetadata, adminClient, - stateDirectory + stateDirectory, + config ); referenceContainer.taskManager = taskManager; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index a17b19997b1..5402c8e7614 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -212,7 +212,7 @@ public interface Task { /** * @return any changelog partitions associated with this task */ - Collection changelogPartitions(); + Set changelogPartitions(); State state(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 4ea419ab9cc..f827cb5f689 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -29,6 +29,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; @@ -93,6 +95,9 @@ public class TaskManager { // includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance private final Set lockedTaskDirectories = new HashSet<>(); + private final StateUpdater stateUpdater; + + TaskManager(final Time time, final ChangelogReader changelogReader, final UUID processId, @@ -101,7 +106,8 @@ public class TaskManager { final StandbyTaskCreator standbyTaskCreator, final TopologyMetadata topologyMetadata, final Admin adminClient, - final StateDirectory stateDirectory) { + final StateDirectory stateDirectory, + final StreamsConfig streamsConfig) { this.time = time; this.changelogReader = changelogReader; this.processId = processId; @@ -114,7 +120,15 @@ public class TaskManager { final LogContext logContext = new LogContext(logPrefix); this.log = logContext.logger(getClass()); - this.tasks = new Tasks(logContext, activeTaskCreator, standbyTaskCreator); + final boolean stateUpdaterEnabled = + InternalConfig.getBoolean(streamsConfig.originals(), InternalConfig.STATE_UPDATER_ENABLED, false); + if (stateUpdaterEnabled) { + stateUpdater = new DefaultStateUpdater(streamsConfig, changelogReader, time); + stateUpdater.start(); + } else { + stateUpdater = null; + } + this.tasks = new Tasks(logContext, activeTaskCreator, standbyTaskCreator, stateUpdater); this.taskExecutor = new TaskExecutor( tasks, topologyMetadata.taskExecutionMetadata(), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index fbb45c59403..2904eca0683 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -71,16 +71,19 @@ class Tasks { private final ActiveTaskCreator activeTaskCreator; private final StandbyTaskCreator standbyTaskCreator; + private final StateUpdater stateUpdater; private Consumer mainConsumer; Tasks(final LogContext logContext, final ActiveTaskCreator activeTaskCreator, - final StandbyTaskCreator standbyTaskCreator) { + final StandbyTaskCreator standbyTaskCreator, + final StateUpdater stateUpdater) { this.log = logContext.logger(getClass()); this.activeTaskCreator = activeTaskCreator; this.standbyTaskCreator = standbyTaskCreator; + this.stateUpdater = stateUpdater; } void setMainConsumer(final Consumer mainConsumer) { @@ -136,10 +139,14 @@ class Tasks { if (!activeTasksToCreate.isEmpty()) { for (final Task activeTask : activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) { - activeTasksPerId.put(activeTask.id(), activeTask); - pendingActiveTasks.remove(activeTask.id()); - for (final TopicPartition topicPartition : activeTask.inputPartitions()) { - activeTasksPerPartition.put(topicPartition, activeTask); + if (stateUpdater != null) { + stateUpdater.add(activeTask); + } else { + activeTasksPerId.put(activeTask.id(), activeTask); + pendingActiveTasks.remove(activeTask.id()); + for (final TopicPartition topicPartition : activeTask.inputPartitions()) { + activeTasksPerPartition.put(topicPartition, activeTask); + } } } } @@ -160,8 +167,12 @@ class Tasks { if (!standbyTasksToCreate.isEmpty()) { for (final Task standbyTask : standbyTaskCreator.createTasks(standbyTasksToCreate)) { - standbyTasksPerId.put(standbyTask.id(), standbyTask); - pendingActiveTasks.remove(standbyTask.id()); + if (stateUpdater != null) { + stateUpdater.add(standbyTask); + } else { + standbyTasksPerId.put(standbyTask.id(), standbyTask); + pendingActiveTasks.remove(standbyTask.id()); + } } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index a6543bd620b..087559742b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -33,7 +33,6 @@ import org.mockito.InOrder; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -129,7 +128,7 @@ class DefaultStateUpdaterTest { @Test public void shouldThrowIfStatefulTaskNotInStateRestoring() { - shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0))); + shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0))); } private void shouldThrowIfActiveTaskNotInStateRestoring(final StreamTask task) { @@ -138,7 +137,7 @@ class DefaultStateUpdaterTest { @Test public void shouldThrowIfStandbyTaskNotInStateRunning() { - final StandbyTask task = createStandbyTask(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task = createStandbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); shouldThrowIfTaskNotInGivenState(task, State.RUNNING); } @@ -153,29 +152,29 @@ class DefaultStateUpdaterTest { @Test public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldThrowIfAddingTasksWithSameId(task1, task2); } @Test public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception { - final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); shouldThrowIfAddingTasksWithSameId(task1, task2); } @Test public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); shouldThrowIfAddingTasksWithSameId(task1, task2); } @Test public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); shouldThrowIfAddingTasksWithSameId(task2, task1); } @@ -218,7 +217,7 @@ class DefaultStateUpdaterTest { @Test public void shouldRestoreSingleActiveStatefulTask() throws Exception { final StreamTask task = - createActiveStatefulTaskInStateRestoring(TASK_0_0, Arrays.asList(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)); + createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)); when(changelogReader.completedChangelogs()) .thenReturn(Collections.emptySet()) .thenReturn(mkSet(TOPIC_PARTITION_A_0)) @@ -244,9 +243,9 @@ class DefaultStateUpdaterTest { @Test public void shouldRestoreMultipleActiveStatefulTasks() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); when(changelogReader.completedChangelogs()) .thenReturn(Collections.emptySet()) .thenReturn(mkSet(TOPIC_PARTITION_C_0)) @@ -298,16 +297,16 @@ class DefaultStateUpdaterTest { public void shouldUpdateSingleStandbyTask() throws Exception { final StandbyTask task = createStandbyTaskInStateRunning( TASK_0_0, - Arrays.asList(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0) + mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0) ); shouldUpdateStandbyTasks(task); } @Test public void shouldUpdateMultipleStandbyTasks() throws Exception { - final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); shouldUpdateStandbyTasks(task1, task2, task3); } @@ -332,10 +331,10 @@ class DefaultStateUpdaterTest { @Test public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); - final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); when(changelogReader.completedChangelogs()) .thenReturn(Collections.emptySet()) .thenReturn(mkSet(TOPIC_PARTITION_A_0)) @@ -362,9 +361,9 @@ class DefaultStateUpdaterTest { @Test public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); - final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); + final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); when(changelogReader.completedChangelogs()) .thenReturn(Collections.emptySet()) .thenReturn(mkSet(TOPIC_PARTITION_A_0)) @@ -392,9 +391,9 @@ class DefaultStateUpdaterTest { @Test public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception { - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(activeTask1.id(), activeTask2.id())); final Map updatingTasks1 = mkMap( @@ -420,9 +419,9 @@ class DefaultStateUpdaterTest { @Test public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception { - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -442,13 +441,13 @@ class DefaultStateUpdaterTest { @Test public void shouldRemoveActiveStatefulTask() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldRemoveStatefulTask(task); } @Test public void shouldRemoveStandbyTask() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldRemoveStatefulTask(task); } @@ -471,8 +470,8 @@ class DefaultStateUpdaterTest { @Test public void shouldRemovePausedTask() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)); stateUpdater.start(); stateUpdater.add(task1); @@ -497,7 +496,7 @@ class DefaultStateUpdaterTest { @Test public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldNotRemoveTaskFromRestoredActiveTasks(task); } @@ -508,7 +507,7 @@ class DefaultStateUpdaterTest { } private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception { - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0)); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -528,18 +527,18 @@ class DefaultStateUpdaterTest { @Test public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldNotRemoveTaskFromFailedTasks(task); } @Test public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldNotRemoveTaskFromFailedTasks(task); } private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exception { - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); final StreamsException streamsException = new StreamsException("Something happened", task.id()); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); @@ -569,22 +568,22 @@ class DefaultStateUpdaterTest { @Test public void shouldPauseActiveStatefulTask() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldPauseStatefulTask(task); verify(changelogReader, never()).transitToUpdateStandby(); } @Test public void shouldPauseStandbyTask() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldPauseStatefulTask(task); verify(changelogReader, times(1)).transitToUpdateStandby(); } @Test public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)); stateUpdater.start(); stateUpdater.add(task1); @@ -630,8 +629,8 @@ class DefaultStateUpdaterTest { @Test public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0)); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -650,18 +649,18 @@ class DefaultStateUpdaterTest { @Test public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldNotPauseTaskInFailedTasks(task); } @Test public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldNotPauseTaskInFailedTasks(task); } private void shouldNotPauseTaskInFailedTasks(final Task task) throws Exception { - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); final StreamsException streamsException = new StreamsException("Something happened", task.id()); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); @@ -690,13 +689,13 @@ class DefaultStateUpdaterTest { @Test public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldNotPauseTaskInRemovedTasks(task); } @Test public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldNotPauseTaskInRemovedTasks(task); } @@ -725,14 +724,14 @@ class DefaultStateUpdaterTest { @Test public void shouldResumeActiveStatefulTask() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldResumeStatefulTask(task); verify(changelogReader, times(2)).enforceRestoreActive(); } @Test public void shouldResumeStandbyTask() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldResumeStatefulTask(task); verify(changelogReader, times(2)).transitToUpdateStandby(); } @@ -766,8 +765,8 @@ class DefaultStateUpdaterTest { @Test public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0)); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -787,13 +786,13 @@ class DefaultStateUpdaterTest { @Test public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldNotPauseTaskInRemovedTasks(task); } @Test public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldNotResumeTaskInRemovedTasks(task); } @@ -818,18 +817,18 @@ class DefaultStateUpdaterTest { @Test public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception { - final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldNotPauseTaskInFailedTasks(task); } @Test public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception { - final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); shouldNotResumeTaskInFailedTasks(task); } private void shouldNotResumeTaskInFailedTasks(final Task task) throws Exception { - final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); final StreamsException streamsException = new StreamsException("Something happened", task.id()); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); @@ -862,15 +861,15 @@ class DefaultStateUpdaterTest { when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); stateUpdater.add(task1); stateUpdater.remove(task1.id()); verifyDrainingRemovedTasks(task1); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_C_0)); - final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_D_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)); + final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)); stateUpdater.add(task2); stateUpdater.remove(task2.id()); stateUpdater.add(task3); @@ -883,8 +882,8 @@ class DefaultStateUpdaterTest { @Test public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); final String exceptionMessage = "The Streams were crossed!"; final StreamsException streamsException = new StreamsException(exceptionMessage); final Map updatingTasks = mkMap( @@ -907,9 +906,9 @@ class DefaultStateUpdaterTest { @Test public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); final String exceptionMessage = "The Streams were crossed!"; final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id()); final StreamsException streamsException2 = new StreamsException(exceptionMessage, task3.id()); @@ -945,9 +944,9 @@ class DefaultStateUpdaterTest { @Test public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); final Set expectedTaskIds = mkSet(task1.id(), task2.id()); final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(expectedTaskIds); final Map updatingTasks = mkMap( @@ -971,8 +970,8 @@ class DefaultStateUpdaterTest { @Test public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); final IllegalStateException illegalStateException = new IllegalStateException("Nobody expects the Spanish inquisition!"); final Map updatingTasks = mkMap( mkEntry(task1.id(), task1), @@ -996,10 +995,10 @@ class DefaultStateUpdaterTest { public void shouldDrainFailedTasksAndExceptions() throws Exception { assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty()); - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_C_0)); - final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_D_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)); + final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)); final String exceptionMessage = "The Streams were crossed!"; final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id()); final Map updatingTasks1 = mkMap( @@ -1044,10 +1043,10 @@ class DefaultStateUpdaterTest { @Test public void shouldAutoCheckpointTasksOnInterval() throws Exception { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); - final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -1070,10 +1069,10 @@ class DefaultStateUpdaterTest { final Time time = new MockTime(); final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time); try { - final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0)); - final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); + final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)); + final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -1106,11 +1105,11 @@ class DefaultStateUpdaterTest { public void shouldGetTasksFromInputQueue() { stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE)); - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_C_0)); - final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); - final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1)); + final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)); + final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); + final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)); stateUpdater.add(activeTask1); stateUpdater.add(standbyTask1); stateUpdater.add(standbyTask2); @@ -1136,11 +1135,11 @@ class DefaultStateUpdaterTest { @Test public void shouldGetTasksFromUpdatingTasks() throws Exception { - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_C_0)); - final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); - final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1)); + final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)); + final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); + final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -1169,8 +1168,8 @@ class DefaultStateUpdaterTest { @Test public void shouldGetTasksFromRestoredActiveTasks() throws Exception { - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); + final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -1187,9 +1186,9 @@ class DefaultStateUpdaterTest { @Test public void shouldGetTasksFromExceptionsAndFailedTasks() throws Exception { - final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); - final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1)); + final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); + final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)); final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(mkSet(standbyTask1.id(), standbyTask2.id())); final StreamsException streamsException = new StreamsException("The Streams were crossed!", activeTask1.id()); @@ -1221,9 +1220,9 @@ class DefaultStateUpdaterTest { @Test public void shouldGetTasksFromRemovedTasks() throws Exception { - final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0)); - final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0)); - final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1)); + final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)); + final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)); + final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.allChangelogsCompleted()).thenReturn(false); stateUpdater.start(); @@ -1244,8 +1243,8 @@ class DefaultStateUpdaterTest { @Test public void shouldGetTasksFromPausedTasks() throws Exception { - final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); - final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_0)); + final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_0)); stateUpdater.start(); stateUpdater.add(activeTask); stateUpdater.add(standbyTask); @@ -1456,14 +1455,14 @@ class DefaultStateUpdaterTest { } private StreamTask createActiveStatefulTaskInStateRestoring(final TaskId taskId, - final Collection changelogPartitions) { + final Set changelogPartitions) { final StreamTask task = createActiveStatefulTask(taskId, changelogPartitions); when(task.state()).thenReturn(State.RESTORING); return task; } private StreamTask createActiveStatefulTask(final TaskId taskId, - final Collection changelogPartitions) { + final Set changelogPartitions) { final StreamTask task = mock(StreamTask.class); setupStatefulTask(task, taskId, changelogPartitions); when(task.isActive()).thenReturn(true); @@ -1485,14 +1484,14 @@ class DefaultStateUpdaterTest { } private StandbyTask createStandbyTaskInStateRunning(final TaskId taskId, - final Collection changelogPartitions) { + final Set changelogPartitions) { final StandbyTask task = createStandbyTask(taskId, changelogPartitions); when(task.state()).thenReturn(State.RUNNING); return task; } private StandbyTask createStandbyTask(final TaskId taskId, - final Collection changelogPartitions) { + final Set changelogPartitions) { final StandbyTask task = mock(StandbyTask.class); setupStatefulTask(task, taskId, changelogPartitions); when(task.isActive()).thenReturn(false); @@ -1501,7 +1500,7 @@ class DefaultStateUpdaterTest { private void setupStatefulTask(final Task task, final TaskId taskId, - final Collection changelogPartitions) { + final Set changelogPartitions) { when(task.changelogPartitions()).thenReturn(changelogPartitions); when(task.id()).thenReturn(taskId); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 2c096789f2f..cff917c64eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -756,7 +756,8 @@ public class StreamThreadTest { null, topologyMetadata, null, - null + null, + config ) { @Override int commit(final Collection tasksToCommit) { @@ -857,7 +858,8 @@ public class StreamThreadTest { standbyTaskCreator, topologyMetadata, null, - null + null, + config ) { @Override int commit(final Collection tasksToCommit) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index b3ffb29b1ae..e943938fecf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; @@ -92,6 +93,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.union; import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; +import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.eq; @@ -193,7 +195,8 @@ public class TaskManagerTest { standbyTaskCreator, topologyMetadata, adminClient, - stateDirectory + stateDirectory, + new StreamsConfig(getStreamsConfig("test-app")) ); taskManager.setMainConsumer(consumer); reset(topologyBuilder); @@ -1615,8 +1618,8 @@ public class TaskManagerTest { ); final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true) { @Override - public Collection changelogPartitions() { - return singletonList(changelog); + public Set changelogPartitions() { + return singleton(changelog); } }; final AtomicBoolean closedDirtyTask01 = new AtomicBoolean(false); @@ -1728,8 +1731,8 @@ public class TaskManagerTest { ); final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true) { @Override - public Collection changelogPartitions() { - return singletonList(changelog); + public Set changelogPartitions() { + return singleton(changelog); } }; final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); @@ -1781,8 +1784,8 @@ public class TaskManagerTest { ); final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true) { @Override - public Collection changelogPartitions() { - return singletonList(changelog); + public Set changelogPartitions() { + return singleton(changelog); } }; @@ -1896,8 +1899,8 @@ public class TaskManagerTest { ); final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true) { @Override - public Collection changelogPartitions() { - return singletonList(changelog); + public Set changelogPartitions() { + return singleton(changelog); } }; final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true) { @@ -2734,8 +2737,8 @@ public class TaskManagerTest { public void shouldReturnFalseWhenThereAreStillNonRunningTasks() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true) { @Override - public Collection changelogPartitions() { - return singletonList(new TopicPartition("fake", 0)); + public Set changelogPartitions() { + return singleton(new TopicPartition("fake", 0)); } }; @@ -3453,7 +3456,7 @@ public class TaskManagerTest { } @Override - public Collection changelogPartitions() { + public Set changelogPartitions() { return changelogOffsets.keySet(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java new file mode 100644 index 00000000000..09699776d01 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask; +import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask; +import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TasksTest { + + private final static TopicPartition TOPIC_PARTITION_A_0 = new TopicPartition("topicA", 0); + private final static TopicPartition TOPIC_PARTITION_A_1 = new TopicPartition("topicA", 1); + private final static TaskId TASK_0_0 = new TaskId(0, 0); + private final static TaskId TASK_0_1 = new TaskId(0, 1); + private final static TaskId TASK_1_0 = new TaskId(1, 0); + + private final LogContext logContext = new LogContext(); + private final ActiveTaskCreator activeTaskCreator = mock(ActiveTaskCreator.class); + private final StandbyTaskCreator standbyTaskCreator = mock(StandbyTaskCreator.class); + private final StateUpdater stateUpdater = mock(StateUpdater.class); + + private Consumer mainConsumer = null; + + @Test + public void shouldCreateTasksWithStateUpdater() { + final Tasks tasks = new Tasks(logContext, activeTaskCreator, standbyTaskCreator, stateUpdater); + tasks.setMainConsumer(mainConsumer); + final StreamTask statefulTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).build(); + final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).build(); + final StreamTask statelessTask = statelessTask(TASK_1_0).build(); + final Map> activeTasks = mkMap( + mkEntry(statefulTask.id(), statefulTask.changelogPartitions()), + mkEntry(statelessTask.id(), statelessTask.changelogPartitions()) + ); + final Map> standbyTasks = + mkMap(mkEntry(standbyTask.id(), standbyTask.changelogPartitions())); + when(activeTaskCreator.createTasks(mainConsumer, activeTasks)).thenReturn(Arrays.asList(statefulTask, statelessTask)); + when(standbyTaskCreator.createTasks(standbyTasks)).thenReturn(Collections.singletonList(standbyTask)); + + tasks.createTasks(activeTasks, standbyTasks); + + final Exception exceptionForStatefulTaskOnTask = assertThrows(IllegalStateException.class, () -> tasks.task(statefulTask.id())); + assertEquals("Task unknown: " + statefulTask.id(), exceptionForStatefulTaskOnTask.getMessage()); + assertFalse(tasks.activeTasks().contains(statefulTask)); + assertFalse(tasks.allTasks().contains(statefulTask)); + final Exception exceptionForStatefulTaskOnTasks = assertThrows(IllegalStateException.class, () -> tasks.tasks(mkSet(statefulTask.id()))); + assertEquals("Task unknown: " + statefulTask.id(), exceptionForStatefulTaskOnTasks.getMessage()); + final Exception exceptionForStatelessTaskOnTask = assertThrows(IllegalStateException.class, () -> tasks.task(statelessTask.id())); + assertEquals("Task unknown: " + statelessTask.id(), exceptionForStatelessTaskOnTask.getMessage()); + assertFalse(tasks.activeTasks().contains(statelessTask)); + assertFalse(tasks.allTasks().contains(statelessTask)); + final Exception exceptionForStatelessTaskOnTasks = assertThrows(IllegalStateException.class, () -> tasks.tasks(mkSet(statelessTask.id()))); + assertEquals("Task unknown: " + statelessTask.id(), exceptionForStatelessTaskOnTasks.getMessage()); + final Exception exceptionForStandbyTaskOnTask = assertThrows(IllegalStateException.class, () -> tasks.task(standbyTask.id())); + assertEquals("Task unknown: " + standbyTask.id(), exceptionForStandbyTaskOnTask.getMessage()); + assertFalse(tasks.allTasks().contains(standbyTask)); + final Exception exceptionForStandByTaskOnTasks = assertThrows(IllegalStateException.class, () -> tasks.tasks(mkSet(standbyTask.id()))); + assertEquals("Task unknown: " + standbyTask.id(), exceptionForStandByTaskOnTasks.getMessage()); + verify(activeTaskCreator).createTasks(mainConsumer, activeTasks); + verify(standbyTaskCreator).createTasks(standbyTasks); + verify(stateUpdater).add(statefulTask); + } + + @Test + public void shouldCreateTasksWithoutStateUpdater() { + final Tasks tasks = new Tasks(logContext, activeTaskCreator, standbyTaskCreator, null); + tasks.setMainConsumer(mainConsumer); + final StreamTask statefulTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).build(); + final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).build(); + final StreamTask statelessTask = statelessTask(TASK_1_0).build(); + final Map> activeTasks = mkMap( + mkEntry(statefulTask.id(), statefulTask.changelogPartitions()), + mkEntry(statelessTask.id(), statelessTask.changelogPartitions()) + ); + final Map> standbyTasks = + mkMap(mkEntry(standbyTask.id(), standbyTask.changelogPartitions())); + when(activeTaskCreator.createTasks(mainConsumer, activeTasks)).thenReturn(Arrays.asList(statefulTask, statelessTask)); + when(standbyTaskCreator.createTasks(standbyTasks)).thenReturn(Collections.singletonList(standbyTask)); + + tasks.createTasks(activeTasks, standbyTasks); + + assertEquals(statefulTask, tasks.task(statefulTask.id())); + assertTrue(tasks.activeTasks().contains(statefulTask)); + assertTrue(tasks.allTasks().contains(statefulTask)); + assertTrue(tasks.tasks(mkSet(statefulTask.id())).contains(statefulTask)); + assertEquals(statelessTask, tasks.task(statelessTask.id())); + assertTrue(tasks.activeTasks().contains(statelessTask)); + assertTrue(tasks.allTasks().contains(statelessTask)); + assertTrue(tasks.tasks(mkSet(statelessTask.id())).contains(statelessTask)); + assertEquals(standbyTask, tasks.task(standbyTask.id())); + assertTrue(tasks.allTasks().contains(standbyTask)); + assertTrue(tasks.tasks(mkSet(standbyTask.id())).contains(standbyTask)); + verify(activeTaskCreator).createTasks(mainConsumer, activeTasks); + verify(standbyTaskCreator).createTasks(standbyTasks); + verify(stateUpdater, never()).add(statefulTask); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 900898295b5..c88515e01d4 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -18,6 +18,7 @@ package org.apache.kafka.test; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; @@ -25,12 +26,17 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.StandbyTask; +import org.apache.kafka.streams.processor.internals.StreamTask; +import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.state.KeyValueIterator; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; @@ -48,6 +54,8 @@ import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public final class StreamsTestUtils { private StreamsTestUtils() {} @@ -273,4 +281,70 @@ public final class StreamsTestUtils { return Arrays.stream(Thread.currentThread().getStackTrace()) .anyMatch(caller -> "org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && "checkSupplier".equals(caller.getMethodName())); } + + public static StreamTask createStatefulTask(final TaskId taskId, + final Set changelogPartitions) { + final StreamTask task = mock(StreamTask.class); + setupStatefulTask(task, taskId, changelogPartitions); + when(task.isActive()).thenReturn(true); + return task; + } + + public static StandbyTask createStandbyTask(final TaskId taskId, + final Set changelogPartitions) { + final StandbyTask task = mock(StandbyTask.class); + setupStatefulTask(task, taskId, changelogPartitions); + when(task.isActive()).thenReturn(false); + return task; + } + + private static void setupStatefulTask(final Task task, + final TaskId taskId, + final Set changelogPartitions) { + when(task.changelogPartitions()).thenReturn(changelogPartitions); + when(task.id()).thenReturn(taskId); + } + + public static class TaskBuilder { + private final T task; + + private TaskBuilder(final T task) { + this.task = task; + } + + public static TaskBuilder statelessTask(final TaskId taskId) { + final StreamTask task = mock(StreamTask.class); + when(task.changelogPartitions()).thenReturn(Collections.emptySet()); + when(task.isActive()).thenReturn(true); + when(task.id()).thenReturn(taskId); + return new TaskBuilder<>(task); + } + + public static TaskBuilder statefulTask(final TaskId taskId, + final Set changelogPartitions) { + final StreamTask task = mock(StreamTask.class); + when(task.isActive()).thenReturn(true); + setupStatefulTask(task, taskId, changelogPartitions); + return new TaskBuilder<>(task); + } + + public static TaskBuilder standbyTask(final TaskId taskId, + final Set changelogPartitions) { + final StandbyTask task = mock(StandbyTask.class); + when(task.isActive()).thenReturn(false); + setupStatefulTask(task, taskId, changelogPartitions); + return new TaskBuilder<>(task); + } + + private static void setupStatefulTask(final Task task, + final TaskId taskId, + final Set changelogPartitions) { + when(task.changelogPartitions()).thenReturn(changelogPartitions); + when(task.id()).thenReturn(taskId); + } + + public T build() { + return task; + } + } }