KAFKA-10199: Add tasks to state updater when they are created (#12427)

This PR introduces an internal config to enable the state updater. If the state updater is enabled newly created tasks are added to the state updater. Additionally, this PR introduces a builder for mocks for tasks.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Bruno Cadonna 2022-07-21 21:37:17 +02:00 committed by GitHub
parent 5e4ae06d12
commit 5a52601691
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 381 additions and 139 deletions

View File

@ -1082,6 +1082,9 @@ public class StreamsConfig extends AbstractConfig {
// Private API used to control the prefix of the auto created topics
public static final String TOPIC_PREFIX_ALTERNATIVE = "__internal.override.topic.prefix__";
// Private API to enable the state updater (i.e. state updating on a dedicated thread)
public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__";
public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) {
final Object value = configs.getOrDefault(key, defaultValue);
if (value instanceof Boolean) {

View File

@ -112,7 +112,7 @@ public abstract class AbstractTask implements Task {
}
@Override
public Collection<TopicPartition> changelogPartitions() {
public Set<TopicPartition> changelogPartitions() {
return stateMgr.changelogPartitions();
}

View File

@ -45,6 +45,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.String.format;
@ -364,7 +365,7 @@ public class ProcessorStateManager implements StateManager {
}
}
Collection<TopicPartition> changelogPartitions() {
Set<TopicPartition> changelogPartitions() {
return changelogOffsets().keySet();
}

View File

@ -384,6 +384,7 @@ public class StreamThread extends Thread {
threadId,
log
);
final TaskManager taskManager = new TaskManager(
time,
changelogReader,
@ -393,7 +394,8 @@ public class StreamThread extends Thread {
standbyTaskCreator,
topologyMetadata,
adminClient,
stateDirectory
stateDirectory,
config
);
referenceContainer.taskManager = taskManager;

View File

@ -212,7 +212,7 @@ public interface Task {
/**
* @return any changelog partitions associated with this task
*/
Collection<TopicPartition> changelogPartitions();
Set<TopicPartition> changelogPartitions();
State state();

View File

@ -29,6 +29,8 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
@ -93,6 +95,9 @@ public class TaskManager {
// includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
private final StateUpdater stateUpdater;
TaskManager(final Time time,
final ChangelogReader changelogReader,
final UUID processId,
@ -101,7 +106,8 @@ public class TaskManager {
final StandbyTaskCreator standbyTaskCreator,
final TopologyMetadata topologyMetadata,
final Admin adminClient,
final StateDirectory stateDirectory) {
final StateDirectory stateDirectory,
final StreamsConfig streamsConfig) {
this.time = time;
this.changelogReader = changelogReader;
this.processId = processId;
@ -114,7 +120,15 @@ public class TaskManager {
final LogContext logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());
this.tasks = new Tasks(logContext, activeTaskCreator, standbyTaskCreator);
final boolean stateUpdaterEnabled =
InternalConfig.getBoolean(streamsConfig.originals(), InternalConfig.STATE_UPDATER_ENABLED, false);
if (stateUpdaterEnabled) {
stateUpdater = new DefaultStateUpdater(streamsConfig, changelogReader, time);
stateUpdater.start();
} else {
stateUpdater = null;
}
this.tasks = new Tasks(logContext, activeTaskCreator, standbyTaskCreator, stateUpdater);
this.taskExecutor = new TaskExecutor(
tasks,
topologyMetadata.taskExecutionMetadata(),

View File

@ -71,16 +71,19 @@ class Tasks {
private final ActiveTaskCreator activeTaskCreator;
private final StandbyTaskCreator standbyTaskCreator;
private final StateUpdater stateUpdater;
private Consumer<byte[], byte[]> mainConsumer;
Tasks(final LogContext logContext,
final ActiveTaskCreator activeTaskCreator,
final StandbyTaskCreator standbyTaskCreator) {
final StandbyTaskCreator standbyTaskCreator,
final StateUpdater stateUpdater) {
this.log = logContext.logger(getClass());
this.activeTaskCreator = activeTaskCreator;
this.standbyTaskCreator = standbyTaskCreator;
this.stateUpdater = stateUpdater;
}
void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
@ -136,10 +139,14 @@ class Tasks {
if (!activeTasksToCreate.isEmpty()) {
for (final Task activeTask : activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) {
activeTasksPerId.put(activeTask.id(), activeTask);
pendingActiveTasks.remove(activeTask.id());
for (final TopicPartition topicPartition : activeTask.inputPartitions()) {
activeTasksPerPartition.put(topicPartition, activeTask);
if (stateUpdater != null) {
stateUpdater.add(activeTask);
} else {
activeTasksPerId.put(activeTask.id(), activeTask);
pendingActiveTasks.remove(activeTask.id());
for (final TopicPartition topicPartition : activeTask.inputPartitions()) {
activeTasksPerPartition.put(topicPartition, activeTask);
}
}
}
}
@ -160,8 +167,12 @@ class Tasks {
if (!standbyTasksToCreate.isEmpty()) {
for (final Task standbyTask : standbyTaskCreator.createTasks(standbyTasksToCreate)) {
standbyTasksPerId.put(standbyTask.id(), standbyTask);
pendingActiveTasks.remove(standbyTask.id());
if (stateUpdater != null) {
stateUpdater.add(standbyTask);
} else {
standbyTasksPerId.put(standbyTask.id(), standbyTask);
pendingActiveTasks.remove(standbyTask.id());
}
}
}
}

View File

@ -33,7 +33,6 @@ import org.mockito.InOrder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -129,7 +128,7 @@ class DefaultStateUpdaterTest {
@Test
public void shouldThrowIfStatefulTaskNotInStateRestoring() {
shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)));
shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)));
}
private void shouldThrowIfActiveTaskNotInStateRestoring(final StreamTask task) {
@ -138,7 +137,7 @@ class DefaultStateUpdaterTest {
@Test
public void shouldThrowIfStandbyTaskNotInStateRunning() {
final StandbyTask task = createStandbyTask(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask task = createStandbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
shouldThrowIfTaskNotInGivenState(task, State.RUNNING);
}
@ -153,29 +152,29 @@ class DefaultStateUpdaterTest {
@Test
public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
shouldThrowIfAddingTasksWithSameId(task2, task1);
}
@ -218,7 +217,7 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreSingleActiveStatefulTask() throws Exception {
final StreamTask task =
createActiveStatefulTaskInStateRestoring(TASK_0_0, Arrays.asList(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
@ -244,9 +243,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreMultipleActiveStatefulTasks() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_C_0))
@ -298,16 +297,16 @@ class DefaultStateUpdaterTest {
public void shouldUpdateSingleStandbyTask() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(
TASK_0_0,
Arrays.asList(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)
mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)
);
shouldUpdateStandbyTasks(task);
}
@Test
public void shouldUpdateMultipleStandbyTasks() throws Exception {
final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
shouldUpdateStandbyTasks(task1, task2, task3);
}
@ -332,10 +331,10 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
@ -362,9 +361,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
@ -392,9 +391,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception {
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final TaskCorruptedException taskCorruptedException =
new TaskCorruptedException(mkSet(activeTask1.id(), activeTask2.id()));
final Map<TaskId, Task> updatingTasks1 = mkMap(
@ -420,9 +419,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception {
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -442,13 +441,13 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRemoveActiveStatefulTask() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldRemoveStatefulTask(task);
}
@Test
public void shouldRemoveStandbyTask() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldRemoveStatefulTask(task);
}
@ -471,8 +470,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRemovePausedTask() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
stateUpdater.start();
stateUpdater.add(task1);
@ -497,7 +496,7 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldNotRemoveTaskFromRestoredActiveTasks(task);
}
@ -508,7 +507,7 @@ class DefaultStateUpdaterTest {
}
private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception {
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -528,18 +527,18 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldNotRemoveTaskFromFailedTasks(task);
}
@Test
public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldNotRemoveTaskFromFailedTasks(task);
}
private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exception {
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@ -569,22 +568,22 @@ class DefaultStateUpdaterTest {
@Test
public void shouldPauseActiveStatefulTask() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldPauseStatefulTask(task);
verify(changelogReader, never()).transitToUpdateStandby();
}
@Test
public void shouldPauseStandbyTask() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldPauseStatefulTask(task);
verify(changelogReader, times(1)).transitToUpdateStandby();
}
@Test
public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
stateUpdater.start();
stateUpdater.add(task1);
@ -630,8 +629,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -650,18 +649,18 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldNotPauseTaskInFailedTasks(task);
}
@Test
public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldNotPauseTaskInFailedTasks(task);
}
private void shouldNotPauseTaskInFailedTasks(final Task task) throws Exception {
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@ -690,13 +689,13 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldNotPauseTaskInRemovedTasks(task);
}
@Test
public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldNotPauseTaskInRemovedTasks(task);
}
@ -725,14 +724,14 @@ class DefaultStateUpdaterTest {
@Test
public void shouldResumeActiveStatefulTask() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldResumeStatefulTask(task);
verify(changelogReader, times(2)).enforceRestoreActive();
}
@Test
public void shouldResumeStandbyTask() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldResumeStatefulTask(task);
verify(changelogReader, times(2)).transitToUpdateStandby();
}
@ -766,8 +765,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -787,13 +786,13 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldNotPauseTaskInRemovedTasks(task);
}
@Test
public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldNotResumeTaskInRemovedTasks(task);
}
@ -818,18 +817,18 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldNotPauseTaskInFailedTasks(task);
}
@Test
public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception {
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
shouldNotResumeTaskInFailedTasks(task);
}
private void shouldNotResumeTaskInFailedTasks(final Task task) throws Exception {
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@ -862,15 +861,15 @@ class DefaultStateUpdaterTest {
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
stateUpdater.add(task1);
stateUpdater.remove(task1.id());
verifyDrainingRemovedTasks(task1);
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_C_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_D_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0));
stateUpdater.add(task2);
stateUpdater.remove(task2.id());
stateUpdater.add(task3);
@ -883,8 +882,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException = new StreamsException(exceptionMessage);
final Map<TaskId, Task> updatingTasks = mkMap(
@ -907,9 +906,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id());
final StreamsException streamsException2 = new StreamsException(exceptionMessage, task3.id());
@ -945,9 +944,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final Set<TaskId> expectedTaskIds = mkSet(task1.id(), task2.id());
final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(expectedTaskIds);
final Map<TaskId, Task> updatingTasks = mkMap(
@ -971,8 +970,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final IllegalStateException illegalStateException = new IllegalStateException("Nobody expects the Spanish inquisition!");
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task1.id(), task1),
@ -996,10 +995,10 @@ class DefaultStateUpdaterTest {
public void shouldDrainFailedTasksAndExceptions() throws Exception {
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_C_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_D_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0));
final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0));
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id());
final Map<TaskId, Task> updatingTasks1 = mkMap(
@ -1044,10 +1043,10 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAutoCheckpointTasksOnInterval() throws Exception {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -1070,10 +1069,10 @@ class DefaultStateUpdaterTest {
final Time time = new MockTime();
final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
try {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -1106,11 +1105,11 @@ class DefaultStateUpdaterTest {
public void shouldGetTasksFromInputQueue() {
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_C_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
stateUpdater.add(activeTask1);
stateUpdater.add(standbyTask1);
stateUpdater.add(standbyTask2);
@ -1136,11 +1135,11 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromUpdatingTasks() throws Exception {
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_C_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -1169,8 +1168,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromRestoredActiveTasks() throws Exception {
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -1187,9 +1186,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromExceptionsAndFailedTasks() throws Exception {
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
final TaskCorruptedException taskCorruptedException =
new TaskCorruptedException(mkSet(standbyTask1.id(), standbyTask2.id()));
final StreamsException streamsException = new StreamsException("The Streams were crossed!", activeTask1.id());
@ -1221,9 +1220,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromRemovedTasks() throws Exception {
final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_1));
final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@ -1244,8 +1243,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromPausedTasks() throws Exception {
final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_0));
stateUpdater.start();
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask);
@ -1456,14 +1455,14 @@ class DefaultStateUpdaterTest {
}
private StreamTask createActiveStatefulTaskInStateRestoring(final TaskId taskId,
final Collection<TopicPartition> changelogPartitions) {
final Set<TopicPartition> changelogPartitions) {
final StreamTask task = createActiveStatefulTask(taskId, changelogPartitions);
when(task.state()).thenReturn(State.RESTORING);
return task;
}
private StreamTask createActiveStatefulTask(final TaskId taskId,
final Collection<TopicPartition> changelogPartitions) {
final Set<TopicPartition> changelogPartitions) {
final StreamTask task = mock(StreamTask.class);
setupStatefulTask(task, taskId, changelogPartitions);
when(task.isActive()).thenReturn(true);
@ -1485,14 +1484,14 @@ class DefaultStateUpdaterTest {
}
private StandbyTask createStandbyTaskInStateRunning(final TaskId taskId,
final Collection<TopicPartition> changelogPartitions) {
final Set<TopicPartition> changelogPartitions) {
final StandbyTask task = createStandbyTask(taskId, changelogPartitions);
when(task.state()).thenReturn(State.RUNNING);
return task;
}
private StandbyTask createStandbyTask(final TaskId taskId,
final Collection<TopicPartition> changelogPartitions) {
final Set<TopicPartition> changelogPartitions) {
final StandbyTask task = mock(StandbyTask.class);
setupStatefulTask(task, taskId, changelogPartitions);
when(task.isActive()).thenReturn(false);
@ -1501,7 +1500,7 @@ class DefaultStateUpdaterTest {
private void setupStatefulTask(final Task task,
final TaskId taskId,
final Collection<TopicPartition> changelogPartitions) {
final Set<TopicPartition> changelogPartitions) {
when(task.changelogPartitions()).thenReturn(changelogPartitions);
when(task.id()).thenReturn(taskId);
}

View File

@ -756,7 +756,8 @@ public class StreamThreadTest {
null,
topologyMetadata,
null,
null
null,
config
) {
@Override
int commit(final Collection<Task> tasksToCommit) {
@ -857,7 +858,8 @@ public class StreamThreadTest {
standbyTaskCreator,
topologyMetadata,
null,
null
null,
config
) {
@Override
int commit(final Collection<Task> tasksToCommit) {

View File

@ -35,6 +35,7 @@ import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
@ -92,6 +93,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.eq;
@ -193,7 +195,8 @@ public class TaskManagerTest {
standbyTaskCreator,
topologyMetadata,
adminClient,
stateDirectory
stateDirectory,
new StreamsConfig(getStreamsConfig("test-app"))
);
taskManager.setMainConsumer(consumer);
reset(topologyBuilder);
@ -1615,8 +1618,8 @@ public class TaskManagerTest {
);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true) {
@Override
public Collection<TopicPartition> changelogPartitions() {
return singletonList(changelog);
public Set<TopicPartition> changelogPartitions() {
return singleton(changelog);
}
};
final AtomicBoolean closedDirtyTask01 = new AtomicBoolean(false);
@ -1728,8 +1731,8 @@ public class TaskManagerTest {
);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true) {
@Override
public Collection<TopicPartition> changelogPartitions() {
return singletonList(changelog);
public Set<TopicPartition> changelogPartitions() {
return singleton(changelog);
}
};
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
@ -1781,8 +1784,8 @@ public class TaskManagerTest {
);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true) {
@Override
public Collection<TopicPartition> changelogPartitions() {
return singletonList(changelog);
public Set<TopicPartition> changelogPartitions() {
return singleton(changelog);
}
};
@ -1896,8 +1899,8 @@ public class TaskManagerTest {
);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true) {
@Override
public Collection<TopicPartition> changelogPartitions() {
return singletonList(changelog);
public Set<TopicPartition> changelogPartitions() {
return singleton(changelog);
}
};
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true) {
@ -2734,8 +2737,8 @@ public class TaskManagerTest {
public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true) {
@Override
public Collection<TopicPartition> changelogPartitions() {
return singletonList(new TopicPartition("fake", 0));
public Set<TopicPartition> changelogPartitions() {
return singleton(new TopicPartition("fake", 0));
}
};
@ -3453,7 +3456,7 @@ public class TaskManagerTest {
}
@Override
public Collection<TopicPartition> changelogPartitions() {
public Set<TopicPartition> changelogPartitions() {
return changelogOffsets.keySet();
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TasksTest {
private final static TopicPartition TOPIC_PARTITION_A_0 = new TopicPartition("topicA", 0);
private final static TopicPartition TOPIC_PARTITION_A_1 = new TopicPartition("topicA", 1);
private final static TaskId TASK_0_0 = new TaskId(0, 0);
private final static TaskId TASK_0_1 = new TaskId(0, 1);
private final static TaskId TASK_1_0 = new TaskId(1, 0);
private final LogContext logContext = new LogContext();
private final ActiveTaskCreator activeTaskCreator = mock(ActiveTaskCreator.class);
private final StandbyTaskCreator standbyTaskCreator = mock(StandbyTaskCreator.class);
private final StateUpdater stateUpdater = mock(StateUpdater.class);
private Consumer<byte[], byte[]> mainConsumer = null;
@Test
public void shouldCreateTasksWithStateUpdater() {
final Tasks tasks = new Tasks(logContext, activeTaskCreator, standbyTaskCreator, stateUpdater);
tasks.setMainConsumer(mainConsumer);
final StreamTask statefulTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).build();
final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).build();
final StreamTask statelessTask = statelessTask(TASK_1_0).build();
final Map<TaskId, Set<TopicPartition>> activeTasks = mkMap(
mkEntry(statefulTask.id(), statefulTask.changelogPartitions()),
mkEntry(statelessTask.id(), statelessTask.changelogPartitions())
);
final Map<TaskId, Set<TopicPartition>> standbyTasks =
mkMap(mkEntry(standbyTask.id(), standbyTask.changelogPartitions()));
when(activeTaskCreator.createTasks(mainConsumer, activeTasks)).thenReturn(Arrays.asList(statefulTask, statelessTask));
when(standbyTaskCreator.createTasks(standbyTasks)).thenReturn(Collections.singletonList(standbyTask));
tasks.createTasks(activeTasks, standbyTasks);
final Exception exceptionForStatefulTaskOnTask = assertThrows(IllegalStateException.class, () -> tasks.task(statefulTask.id()));
assertEquals("Task unknown: " + statefulTask.id(), exceptionForStatefulTaskOnTask.getMessage());
assertFalse(tasks.activeTasks().contains(statefulTask));
assertFalse(tasks.allTasks().contains(statefulTask));
final Exception exceptionForStatefulTaskOnTasks = assertThrows(IllegalStateException.class, () -> tasks.tasks(mkSet(statefulTask.id())));
assertEquals("Task unknown: " + statefulTask.id(), exceptionForStatefulTaskOnTasks.getMessage());
final Exception exceptionForStatelessTaskOnTask = assertThrows(IllegalStateException.class, () -> tasks.task(statelessTask.id()));
assertEquals("Task unknown: " + statelessTask.id(), exceptionForStatelessTaskOnTask.getMessage());
assertFalse(tasks.activeTasks().contains(statelessTask));
assertFalse(tasks.allTasks().contains(statelessTask));
final Exception exceptionForStatelessTaskOnTasks = assertThrows(IllegalStateException.class, () -> tasks.tasks(mkSet(statelessTask.id())));
assertEquals("Task unknown: " + statelessTask.id(), exceptionForStatelessTaskOnTasks.getMessage());
final Exception exceptionForStandbyTaskOnTask = assertThrows(IllegalStateException.class, () -> tasks.task(standbyTask.id()));
assertEquals("Task unknown: " + standbyTask.id(), exceptionForStandbyTaskOnTask.getMessage());
assertFalse(tasks.allTasks().contains(standbyTask));
final Exception exceptionForStandByTaskOnTasks = assertThrows(IllegalStateException.class, () -> tasks.tasks(mkSet(standbyTask.id())));
assertEquals("Task unknown: " + standbyTask.id(), exceptionForStandByTaskOnTasks.getMessage());
verify(activeTaskCreator).createTasks(mainConsumer, activeTasks);
verify(standbyTaskCreator).createTasks(standbyTasks);
verify(stateUpdater).add(statefulTask);
}
@Test
public void shouldCreateTasksWithoutStateUpdater() {
final Tasks tasks = new Tasks(logContext, activeTaskCreator, standbyTaskCreator, null);
tasks.setMainConsumer(mainConsumer);
final StreamTask statefulTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).build();
final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).build();
final StreamTask statelessTask = statelessTask(TASK_1_0).build();
final Map<TaskId, Set<TopicPartition>> activeTasks = mkMap(
mkEntry(statefulTask.id(), statefulTask.changelogPartitions()),
mkEntry(statelessTask.id(), statelessTask.changelogPartitions())
);
final Map<TaskId, Set<TopicPartition>> standbyTasks =
mkMap(mkEntry(standbyTask.id(), standbyTask.changelogPartitions()));
when(activeTaskCreator.createTasks(mainConsumer, activeTasks)).thenReturn(Arrays.asList(statefulTask, statelessTask));
when(standbyTaskCreator.createTasks(standbyTasks)).thenReturn(Collections.singletonList(standbyTask));
tasks.createTasks(activeTasks, standbyTasks);
assertEquals(statefulTask, tasks.task(statefulTask.id()));
assertTrue(tasks.activeTasks().contains(statefulTask));
assertTrue(tasks.allTasks().contains(statefulTask));
assertTrue(tasks.tasks(mkSet(statefulTask.id())).contains(statefulTask));
assertEquals(statelessTask, tasks.task(statelessTask.id()));
assertTrue(tasks.activeTasks().contains(statelessTask));
assertTrue(tasks.allTasks().contains(statelessTask));
assertTrue(tasks.tasks(mkSet(statelessTask.id())).contains(statelessTask));
assertEquals(standbyTask, tasks.task(standbyTask.id()));
assertTrue(tasks.allTasks().contains(standbyTask));
assertTrue(tasks.tasks(mkSet(standbyTask.id())).contains(standbyTask));
verify(activeTaskCreator).createTasks(mainConsumer, activeTasks);
verify(standbyTaskCreator).createTasks(standbyTasks);
verify(stateUpdater, never()).add(statefulTask);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.test;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
@ -25,12 +26,17 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.KeyValueIterator;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
@ -48,6 +54,8 @@ import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public final class StreamsTestUtils {
private StreamsTestUtils() {}
@ -273,4 +281,70 @@ public final class StreamsTestUtils {
return Arrays.stream(Thread.currentThread().getStackTrace())
.anyMatch(caller -> "org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && "checkSupplier".equals(caller.getMethodName()));
}
public static StreamTask createStatefulTask(final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
final StreamTask task = mock(StreamTask.class);
setupStatefulTask(task, taskId, changelogPartitions);
when(task.isActive()).thenReturn(true);
return task;
}
public static StandbyTask createStandbyTask(final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
final StandbyTask task = mock(StandbyTask.class);
setupStatefulTask(task, taskId, changelogPartitions);
when(task.isActive()).thenReturn(false);
return task;
}
private static void setupStatefulTask(final Task task,
final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
when(task.changelogPartitions()).thenReturn(changelogPartitions);
when(task.id()).thenReturn(taskId);
}
public static class TaskBuilder<T extends Task> {
private final T task;
private TaskBuilder(final T task) {
this.task = task;
}
public static TaskBuilder<StreamTask> statelessTask(final TaskId taskId) {
final StreamTask task = mock(StreamTask.class);
when(task.changelogPartitions()).thenReturn(Collections.emptySet());
when(task.isActive()).thenReturn(true);
when(task.id()).thenReturn(taskId);
return new TaskBuilder<>(task);
}
public static TaskBuilder<StreamTask> statefulTask(final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
final StreamTask task = mock(StreamTask.class);
when(task.isActive()).thenReturn(true);
setupStatefulTask(task, taskId, changelogPartitions);
return new TaskBuilder<>(task);
}
public static TaskBuilder<StandbyTask> standbyTask(final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
final StandbyTask task = mock(StandbyTask.class);
when(task.isActive()).thenReturn(false);
setupStatefulTask(task, taskId, changelogPartitions);
return new TaskBuilder<>(task);
}
private static void setupStatefulTask(final Task task,
final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
when(task.changelogPartitions()).thenReturn(changelogPartitions);
when(task.id()).thenReturn(taskId);
}
public T build() {
return task;
}
}
}