|
|
|
@ -61,8 +61,6 @@ import org.junit.jupiter.api.BeforeEach;
|
|
|
|
|
import org.junit.jupiter.api.Test;
|
|
|
|
|
import org.junit.jupiter.api.extension.ExtendWith;
|
|
|
|
|
import org.junit.jupiter.api.io.TempDir;
|
|
|
|
|
import org.junit.jupiter.params.ParameterizedTest;
|
|
|
|
|
import org.junit.jupiter.params.provider.ValueSource;
|
|
|
|
|
import org.mockito.InOrder;
|
|
|
|
|
import org.mockito.Mock;
|
|
|
|
|
import org.mockito.junit.jupiter.MockitoExtension;
|
|
|
|
@ -216,24 +214,15 @@ public class TaskManagerTest {
|
|
|
|
|
|
|
|
|
|
@BeforeEach
|
|
|
|
|
public void setUp() {
|
|
|
|
|
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
|
|
|
|
|
taskManager = setUpTaskManagerWithoutStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private TaskManager setUpTaskManager(final ProcessingMode processingMode, final TasksRegistry tasks) {
|
|
|
|
|
return setUpTaskManager(processingMode, tasks, false);
|
|
|
|
|
private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode processingMode, final TasksRegistry tasks) {
|
|
|
|
|
return setUpTaskManagerWithStateUpdater(processingMode, tasks, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private TaskManager setUpTaskManager(final ProcessingMode processingMode, final boolean stateUpdaterEnabled) {
|
|
|
|
|
return setUpTaskManager(processingMode, null, stateUpdaterEnabled, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private TaskManager setUpTaskManager(final ProcessingMode processingMode, final TasksRegistry tasks, final boolean stateUpdaterEnabled) {
|
|
|
|
|
return setUpTaskManager(processingMode, tasks, stateUpdaterEnabled, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private TaskManager setUpTaskManager(final ProcessingMode processingMode,
|
|
|
|
|
private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode processingMode,
|
|
|
|
|
final TasksRegistry tasks,
|
|
|
|
|
final boolean stateUpdaterEnabled,
|
|
|
|
|
final boolean processingThreadsEnabled) {
|
|
|
|
|
topologyMetadata = new TopologyMetadata(topologyBuilder, new DummyStreamsConfig(processingMode));
|
|
|
|
|
final TaskManager taskManager = new TaskManager(
|
|
|
|
@ -247,7 +236,29 @@ public class TaskManagerTest {
|
|
|
|
|
topologyMetadata,
|
|
|
|
|
adminClient,
|
|
|
|
|
stateDirectory,
|
|
|
|
|
stateUpdaterEnabled ? stateUpdater : null,
|
|
|
|
|
stateUpdater,
|
|
|
|
|
processingThreadsEnabled ? schedulingTaskManager : null
|
|
|
|
|
);
|
|
|
|
|
taskManager.setMainConsumer(consumer);
|
|
|
|
|
return taskManager;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private TaskManager setUpTaskManagerWithoutStateUpdater(final ProcessingMode processingMode,
|
|
|
|
|
final TasksRegistry tasks,
|
|
|
|
|
final boolean processingThreadsEnabled) {
|
|
|
|
|
topologyMetadata = new TopologyMetadata(topologyBuilder, new DummyStreamsConfig(processingMode));
|
|
|
|
|
final TaskManager taskManager = new TaskManager(
|
|
|
|
|
time,
|
|
|
|
|
changeLogReader,
|
|
|
|
|
ProcessId.randomProcessId(),
|
|
|
|
|
"taskManagerTest",
|
|
|
|
|
activeTaskCreator,
|
|
|
|
|
standbyTaskCreator,
|
|
|
|
|
tasks != null ? tasks : new Tasks(new LogContext()),
|
|
|
|
|
topologyMetadata,
|
|
|
|
|
adminClient,
|
|
|
|
|
stateDirectory,
|
|
|
|
|
null,
|
|
|
|
|
processingThreadsEnabled ? schedulingTaskManager : null
|
|
|
|
|
);
|
|
|
|
|
taskManager.setMainConsumer(consumer);
|
|
|
|
@ -261,7 +272,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId00Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
|
|
|
|
|
when(tasks.task(taskId00)).thenReturn(activeTask1);
|
|
|
|
|
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
|
|
|
|
@ -283,7 +294,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId01Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
|
|
|
|
|
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
|
|
|
|
|
|
|
|
|
@ -296,7 +307,7 @@ public class TaskManagerTest {
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
|
|
|
|
|
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
|
|
|
|
|
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
|
|
|
|
@ -319,7 +330,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId01Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
|
|
|
|
|
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
|
|
|
|
|
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
|
|
|
|
@ -339,7 +350,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId01Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
|
|
|
|
|
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
|
|
|
|
|
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
|
|
|
|
@ -359,7 +370,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId01Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
|
|
|
|
|
|
|
|
|
|
taskManager.resumePollingForPartitionsWithAvailableSpace();
|
|
|
|
@ -377,7 +388,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId01Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
|
|
|
|
|
|
|
|
|
|
taskManager.updateLags();
|
|
|
|
@ -392,7 +403,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RESTORING)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
|
|
|
|
|
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
|
|
|
|
|
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
|
|
|
|
@ -412,7 +423,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RESTORING)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
|
|
|
|
|
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
|
|
|
|
|
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
|
|
|
|
@ -433,7 +444,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
|
|
|
|
|
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
|
|
|
|
|
when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
|
|
|
|
@ -453,7 +464,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
|
|
|
|
|
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
|
|
|
|
|
when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
|
|
|
|
@ -474,7 +485,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTask));
|
|
|
|
|
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
|
|
|
|
|
when(stateUpdater.remove(failedStandbyTask.id())).thenReturn(future);
|
|
|
|
@ -501,7 +512,7 @@ public class TaskManagerTest {
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
|
|
|
|
|
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
|
|
|
|
|
when(stateUpdater.remove(activeTaskToUpdateInputPartitions.id())).thenReturn(future);
|
|
|
|
@ -529,7 +540,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.CREATED)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
|
|
|
|
|
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId03Partitions))
|
|
|
|
|
.thenReturn(recycledStandbyTask);
|
|
|
|
@ -553,7 +564,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RESTORING)
|
|
|
|
|
.withInputPartitions(taskId00Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
|
|
|
|
|
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, activeTaskToRecycle.inputPartitions()))
|
|
|
|
|
.thenThrow(new RuntimeException());
|
|
|
|
@ -583,7 +594,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.CREATED)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
|
|
|
|
|
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, taskId03Partitions, consumer))
|
|
|
|
|
.thenReturn(recycledActiveTask);
|
|
|
|
@ -607,7 +618,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
|
|
|
|
|
when(activeTaskCreator.createActiveTaskFromStandby(
|
|
|
|
|
standbyTaskToRecycle,
|
|
|
|
@ -637,7 +648,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RESTORING)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask));
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(
|
|
|
|
@ -656,7 +667,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.SUSPENDED)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask));
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(
|
|
|
|
@ -676,7 +687,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RESTORING)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToRecycle));
|
|
|
|
|
final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!");
|
|
|
|
|
when(stateUpdater.remove(failedActiveTaskToRecycle.id()))
|
|
|
|
@ -706,7 +717,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTaskToRecycle));
|
|
|
|
|
final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!");
|
|
|
|
|
when(stateUpdater.remove(failedStandbyTaskToRecycle.id()))
|
|
|
|
@ -736,7 +747,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RESTORING)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToReassign));
|
|
|
|
|
final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!");
|
|
|
|
|
when(stateUpdater.remove(failedActiveTaskToReassign.id()))
|
|
|
|
@ -769,7 +780,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RESTORING)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1));
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2));
|
|
|
|
|
when(stateUpdater.remove(reassignedActiveTask2.id()))
|
|
|
|
@ -795,7 +806,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(
|
|
|
|
@ -813,7 +824,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedStandbyTask));
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(
|
|
|
|
@ -837,7 +848,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.CREATED)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose, standbyTaskToRecycle));
|
|
|
|
|
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForActiveTaskToClose = new CompletableFuture<>();
|
|
|
|
|
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(futureForActiveTaskToClose);
|
|
|
|
@ -872,7 +883,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater));
|
|
|
|
|
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, runningActiveTask)));
|
|
|
|
@ -896,7 +907,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask)));
|
|
|
|
|
assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, activeTask)));
|
|
|
|
@ -908,7 +919,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.CREATED)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
final Set<Task> createdTasks = Set.of(activeTaskToBeCreated);
|
|
|
|
|
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
|
|
|
|
|
mkEntry(activeTaskToBeCreated.id(), activeTaskToBeCreated.inputPartitions()));
|
|
|
|
@ -926,7 +937,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.CREATED)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
final Set<Task> createdTasks = Set.of(standbyTaskToBeCreated);
|
|
|
|
|
when(standbyTaskCreator.createTasks(mkMap(
|
|
|
|
|
mkEntry(standbyTaskToBeCreated.id(), standbyTaskToBeCreated.inputPartitions())))
|
|
|
|
@ -953,7 +964,7 @@ public class TaskManagerTest {
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle));
|
|
|
|
|
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId01Partitions))
|
|
|
|
|
.thenReturn(standbyTask);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
|
|
|
|
|
|
|
|
|
@ -964,28 +975,6 @@ public class TaskManagerTest {
|
|
|
|
|
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldAddRecycledStandbyTasksFromActiveToTaskRegistryWithStateUpdaterDisabled() {
|
|
|
|
|
final StreamTask activeTaskToRecycle = statefulTask(taskId01, taskId01ChangelogPartitions)
|
|
|
|
|
.withInputPartitions(taskId01Partitions)
|
|
|
|
|
.inState(State.RUNNING).build();
|
|
|
|
|
final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
|
|
|
|
|
.withInputPartitions(taskId01Partitions)
|
|
|
|
|
.inState(State.CREATED).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
when(tasks.allTasks()).thenReturn(Set.of(activeTaskToRecycle));
|
|
|
|
|
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId01Partitions))
|
|
|
|
|
.thenReturn(standbyTask);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
|
|
|
|
|
|
|
|
|
|
verify(activeTaskToRecycle).prepareCommit(true);
|
|
|
|
|
verify(tasks).replaceActiveWithStandby(standbyTask);
|
|
|
|
|
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
|
|
|
|
|
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistryWithStateUpdaterEnabled() {
|
|
|
|
|
final StandbyTask standbyTaskToRecycle = standbyTask(taskId03, taskId03ChangelogPartitions)
|
|
|
|
@ -993,7 +982,7 @@ public class TaskManagerTest {
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
final IllegalStateException illegalStateException = assertThrows(
|
|
|
|
|
IllegalStateException.class,
|
|
|
|
@ -1014,7 +1003,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
|
|
|
|
@ -1032,7 +1021,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose));
|
|
|
|
|
|
|
|
|
|
final IllegalStateException illegalStateException = assertThrows(
|
|
|
|
@ -1052,7 +1041,7 @@ public class TaskManagerTest {
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
|
|
|
|
|
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, newInputPartitions)).thenReturn(true);
|
|
|
|
|
|
|
|
|
@ -1072,7 +1061,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(
|
|
|
|
@ -1090,7 +1079,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.SUSPENDED)
|
|
|
|
|
.withInputPartitions(taskId03Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(
|
|
|
|
@ -1112,7 +1101,7 @@ public class TaskManagerTest {
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
|
|
|
|
|
|
|
|
|
|
final IllegalStateException illegalStateException = assertThrows(
|
|
|
|
@ -1137,7 +1126,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.CREATED)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(
|
|
|
|
@ -1163,7 +1152,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01));
|
|
|
|
|
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
|
|
|
|
|
|
|
|
|
|
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
|
|
|
|
|
|
|
|
|
@ -1185,7 +1174,7 @@ public class TaskManagerTest {
|
|
|
|
|
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01));
|
|
|
|
|
final LockException lockException = new LockException("Where are my keys??");
|
|
|
|
|
doThrow(lockException).when(task00).initializeIfNeeded();
|
|
|
|
|
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
|
|
|
|
|
|
|
|
|
|
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
|
|
|
|
|
|
|
|
|
@ -1209,7 +1198,7 @@ public class TaskManagerTest {
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01));
|
|
|
|
|
doThrow(new LockException("Lock Exception!")).when(task00).initializeIfNeeded();
|
|
|
|
|
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
|
|
|
|
|
|
|
|
|
|
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
|
|
|
|
|
|
|
|
|
@ -1255,7 +1244,7 @@ public class TaskManagerTest {
|
|
|
|
|
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00));
|
|
|
|
|
final RuntimeException runtimeException = new RuntimeException("KABOOM!");
|
|
|
|
|
doThrow(runtimeException).when(task00).initializeIfNeeded();
|
|
|
|
|
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
|
|
|
|
|
|
|
|
|
|
final StreamsException streamsException = assertThrows(
|
|
|
|
|
StreamsException.class,
|
|
|
|
@ -1281,7 +1270,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.CREATED)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks, false);
|
|
|
|
|
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(statefulTask0, statefulTask1, statefulTask2));
|
|
|
|
|
doThrow(new TaskCorruptedException(Collections.singleton(statefulTask0.id))).when(statefulTask0).initializeIfNeeded();
|
|
|
|
|
doThrow(new TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded();
|
|
|
|
@ -1302,7 +1291,7 @@ public class TaskManagerTest {
|
|
|
|
|
public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
|
|
|
|
|
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
|
|
|
|
|
}
|
|
|
|
@ -1311,7 +1300,7 @@ public class TaskManagerTest {
|
|
|
|
|
public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToRecycleButPendingTasksToInit() {
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
when(tasks.hasPendingTasksToInit()).thenReturn(true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
|
|
|
|
|
}
|
|
|
|
@ -1319,7 +1308,7 @@ public class TaskManagerTest {
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingInit() {
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
|
|
|
|
|
}
|
|
|
|
@ -1539,7 +1528,7 @@ public class TaskManagerTest {
|
|
|
|
|
|
|
|
|
|
private TaskManager setupForRevocationAndLost(final Set<Task> tasksInStateUpdater,
|
|
|
|
|
final TasksRegistry tasks) {
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(tasksInStateUpdater);
|
|
|
|
|
|
|
|
|
|
return taskManager;
|
|
|
|
@ -1608,12 +1597,12 @@ public class TaskManagerTest {
|
|
|
|
|
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
|
|
|
|
|
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(statefulTasks);
|
|
|
|
|
|
|
|
|
|
return setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
return setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() {
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null, false);
|
|
|
|
|
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
|
|
|
|
|
assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
|
|
|
|
|
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
|
|
|
|
@ -1631,7 +1620,7 @@ public class TaskManagerTest {
|
|
|
|
|
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
|
|
|
|
|
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
final StreamsException thrown = assertThrows(
|
|
|
|
|
StreamsException.class,
|
|
|
|
@ -1658,7 +1647,7 @@ public class TaskManagerTest {
|
|
|
|
|
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0, exceptionAndTasks1));
|
|
|
|
|
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
final TaskCorruptedException thrown = assertThrows(
|
|
|
|
|
TaskCorruptedException.class,
|
|
|
|
@ -1742,7 +1731,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId00Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0));
|
|
|
|
|
final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
|
|
|
|
|
when(consumer.assignment()).thenReturn(assigned);
|
|
|
|
@ -1787,7 +1776,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask, restoringStatefulTask));
|
|
|
|
|
when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask));
|
|
|
|
@ -1819,7 +1808,7 @@ public class TaskManagerTest {
|
|
|
|
|
when(runningStatefulTask.changelogOffsets())
|
|
|
|
|
.thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffsetOfRunningTask)));
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
|
|
|
|
|
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
|
|
|
|
|
|
|
|
|
|
assertThat(
|
|
|
|
@ -1850,7 +1839,7 @@ public class TaskManagerTest {
|
|
|
|
|
final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
|
|
|
|
|
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
|
|
|
|
|
taskManager.handleRebalanceStart(singleton("topic"));
|
|
|
|
|
|
|
|
|
@ -1868,7 +1857,7 @@ public class TaskManagerTest {
|
|
|
|
|
final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
|
|
|
|
|
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask));
|
|
|
|
|
taskManager.handleRebalanceStart(singleton("topic"));
|
|
|
|
|
|
|
|
|
@ -1893,7 +1882,7 @@ public class TaskManagerTest {
|
|
|
|
|
when(restoringStandbyTask.changelogOffsets())
|
|
|
|
|
.thenReturn(mkMap(mkEntry(t1p2changelog, changelogOffsetOfRestoringStandbyTask)));
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask, restoringStatefulTask));
|
|
|
|
|
|
|
|
|
@ -1918,7 +1907,7 @@ public class TaskManagerTest {
|
|
|
|
|
mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
|
|
|
|
|
));
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
|
|
|
|
|
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, restoringStatefulTask)));
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
|
|
|
|
|
|
|
|
|
@ -1955,17 +1944,23 @@ public class TaskManagerTest {
|
|
|
|
|
);
|
|
|
|
|
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
|
|
|
|
|
|
|
|
|
|
final StandbyTask standbyTask = standbyTask(taskId00, taskId00ChangelogPartitions)
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId00Partitions)
|
|
|
|
|
.build();
|
|
|
|
|
when(standbyTask.changelogOffsets()).thenReturn(changelogOffsets);
|
|
|
|
|
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
|
|
|
|
|
|
|
|
|
|
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask));
|
|
|
|
|
|
|
|
|
|
expectLockObtainedFor(taskId00);
|
|
|
|
|
expectDirectoryNotEmpty(taskId00);
|
|
|
|
|
makeTaskFolders(taskId00.toString());
|
|
|
|
|
|
|
|
|
|
taskManager.handleRebalanceStart(singleton("topic"));
|
|
|
|
|
final StateMachineTask restoringTask = handleAssignment(
|
|
|
|
|
emptyMap(),
|
|
|
|
|
taskId00Assignment,
|
|
|
|
|
emptyMap()
|
|
|
|
|
).get(taskId00);
|
|
|
|
|
restoringTask.setChangelogOffsets(changelogOffsets);
|
|
|
|
|
taskManager.handleAssignment(emptyMap(), taskId00Assignment);
|
|
|
|
|
|
|
|
|
|
assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
|
|
|
|
|
}
|
|
|
|
@ -2174,7 +2169,7 @@ public class TaskManagerTest {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() {
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
|
|
|
|
|
|
|
|
|
|
taskManager.handleLostAll();
|
|
|
|
|
|
|
|
|
@ -2190,7 +2185,7 @@ public class TaskManagerTest {
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.withInputPartitions(taskId02Partitions).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
|
|
|
|
|
when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
|
|
|
|
|
|
|
|
|
@ -2281,38 +2276,43 @@ public class TaskManagerTest {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
|
|
|
|
|
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
|
|
|
|
|
final StreamTask corruptedTask = statefulTask(taskId00, taskId00ChangelogPartitions)
|
|
|
|
|
.withInputPartitions(taskId00Partitions)
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
|
|
|
|
|
final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
|
|
|
|
|
final StreamTask nonCorruptedTask = statefulTask(taskId01, taskId01ChangelogPartitions)
|
|
|
|
|
.withInputPartitions(taskId01Partitions)
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
final Map<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<>(taskId00Assignment);
|
|
|
|
|
firstAssignment.putAll(taskId01Assignment);
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
when(tasks.task(taskId00)).thenReturn(corruptedTask);
|
|
|
|
|
when(tasks.allTasksPerId()).thenReturn(mkMap(
|
|
|
|
|
mkEntry(taskId00, corruptedTask),
|
|
|
|
|
mkEntry(taskId01, nonCorruptedTask)
|
|
|
|
|
));
|
|
|
|
|
when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
|
|
|
|
|
|
|
|
|
|
// `handleAssignment`
|
|
|
|
|
when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
|
|
|
|
|
.thenReturn(asList(corruptedTask, nonCorruptedTask));
|
|
|
|
|
when(nonCorruptedTask.commitNeeded()).thenReturn(true);
|
|
|
|
|
when(nonCorruptedTask.prepareCommit(true)).thenReturn(emptyMap());
|
|
|
|
|
when(corruptedTask.prepareCommit(false)).thenReturn(emptyMap());
|
|
|
|
|
doNothing().when(corruptedTask).postCommit(anyBoolean());
|
|
|
|
|
|
|
|
|
|
when(consumer.assignment())
|
|
|
|
|
.thenReturn(assignment)
|
|
|
|
|
.thenReturn(taskId00Partitions);
|
|
|
|
|
when(consumer.assignment()).thenReturn(taskId00Partitions);
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(firstAssignment, emptyMap());
|
|
|
|
|
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
assertThat(nonCorruptedTask.state(), is(Task.State.RUNNING));
|
|
|
|
|
nonCorruptedTask.setCommitNeeded();
|
|
|
|
|
taskManager.handleCorruption(Set.of(taskId00));
|
|
|
|
|
|
|
|
|
|
corruptedTask.setChangelogOffsets(singletonMap(t1p0, 0L));
|
|
|
|
|
taskManager.handleCorruption(singleton(taskId00));
|
|
|
|
|
|
|
|
|
|
assertTrue(nonCorruptedTask.commitPrepared);
|
|
|
|
|
assertThat(nonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet()));
|
|
|
|
|
assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
|
|
|
|
|
verify(nonCorruptedTask).prepareCommit(true);
|
|
|
|
|
verify(nonCorruptedTask, never()).addPartitionsForOffsetReset(any());
|
|
|
|
|
verify(corruptedTask).addPartitionsForOffsetReset(taskId00Partitions);
|
|
|
|
|
verify(corruptedTask).changelogPartitions();
|
|
|
|
|
verify(corruptedTask).postCommit(true);
|
|
|
|
|
|
|
|
|
|
// check that we should not commit empty map either
|
|
|
|
|
verify(consumer, never()).commitSync(emptyMap());
|
|
|
|
|
verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
@ -2359,7 +2359,7 @@ public class TaskManagerTest {
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, corruptedTask)));
|
|
|
|
|
when(tasks.task(taskId02)).thenReturn(corruptedTask);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
when(consumer.assignment()).thenReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
|
|
|
|
|
|
|
|
|
|
taskManager.handleCorruption(Set.of(taskId02));
|
|
|
|
@ -2372,37 +2372,6 @@ public class TaskManagerTest {
|
|
|
|
|
verify(standbyTask, never()).postCommit(anyBoolean());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitRunningStandbyTasksWithStateUpdaterDisabled() {
|
|
|
|
|
final StreamTask activeRestoringTask = statefulTask(taskId00, taskId00ChangelogPartitions)
|
|
|
|
|
.withInputPartitions(taskId00Partitions)
|
|
|
|
|
.inState(State.RESTORING).build();
|
|
|
|
|
final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
|
|
|
|
|
.withInputPartitions(taskId01Partitions)
|
|
|
|
|
.inState(State.RUNNING).build();
|
|
|
|
|
when(standbyTask.commitNeeded()).thenReturn(true);
|
|
|
|
|
final StreamTask corruptedTask = statefulTask(taskId02, taskId02ChangelogPartitions)
|
|
|
|
|
.withInputPartitions(taskId02Partitions)
|
|
|
|
|
.inState(State.RUNNING).build();
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
when(tasks.allTasksPerId()).thenReturn(mkMap(
|
|
|
|
|
mkEntry(taskId00, activeRestoringTask),
|
|
|
|
|
mkEntry(taskId01, standbyTask),
|
|
|
|
|
mkEntry(taskId02, corruptedTask)
|
|
|
|
|
));
|
|
|
|
|
when(tasks.task(taskId02)).thenReturn(corruptedTask);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
|
|
|
|
|
when(consumer.assignment()).thenReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
|
|
|
|
|
|
|
|
|
|
taskManager.handleCorruption(Set.of(taskId02));
|
|
|
|
|
|
|
|
|
|
verify(activeRestoringTask, never()).commitNeeded();
|
|
|
|
|
verify(activeRestoringTask, never()).prepareCommit(true);
|
|
|
|
|
verify(activeRestoringTask, never()).postCommit(anyBoolean());
|
|
|
|
|
verify(standbyTask).prepareCommit(true);
|
|
|
|
|
verify(standbyTask).postCommit(anyBoolean());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
|
|
|
|
|
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
|
|
|
|
@ -2545,7 +2514,7 @@ public class TaskManagerTest {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS() {
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
|
|
|
|
|
final StreamsProducer producer = mock(StreamsProducer.class);
|
|
|
|
|
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
|
|
|
|
|
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
|
|
|
|
@ -2677,7 +2646,7 @@ public class TaskManagerTest {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS() {
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
|
|
|
|
|
final StreamsProducer producer = mock(StreamsProducer.class);
|
|
|
|
|
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
|
|
|
|
|
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
|
|
|
|
@ -2919,7 +2888,7 @@ public class TaskManagerTest {
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() {
|
|
|
|
|
final StreamsProducer producer = mock(StreamsProducer.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
|
|
|
|
|
|
|
|
|
|
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
|
|
|
|
|
final Map<TopicPartition, OffsetAndMetadata> offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
|
|
|
|
@ -3076,7 +3045,7 @@ public class TaskManagerTest {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() {
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
|
|
|
|
|
|
|
|
|
|
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
|
|
|
|
|
|
|
|
|
@ -3212,7 +3181,7 @@ public class TaskManagerTest {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final ProcessingMode processingMode) {
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(processingMode, null, false);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(processingMode, null, false);
|
|
|
|
|
|
|
|
|
|
final TopicPartition changelog = new TopicPartition("changelog", 0);
|
|
|
|
|
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
|
|
|
|
@ -3367,7 +3336,7 @@ public class TaskManagerTest {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() {
|
|
|
|
|
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
|
|
|
|
|
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
|
|
|
|
|
|
|
|
|
|
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
|
|
|
|
|
|
|
|
|
@ -3533,7 +3502,7 @@ public class TaskManagerTest {
|
|
|
|
|
new ExceptionAndTask(new RuntimeException(), failedStatefulTask),
|
|
|
|
|
new ExceptionAndTask(new RuntimeException(), failedStandbyTask))
|
|
|
|
|
);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
taskManager.shutdown(true);
|
|
|
|
|
|
|
|
|
@ -3547,7 +3516,7 @@ public class TaskManagerTest {
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldShutdownSchedulingTaskManager() {
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
|
|
|
|
|
taskManager.shutdown(true);
|
|
|
|
|
|
|
|
|
@ -3596,7 +3565,7 @@ public class TaskManagerTest {
|
|
|
|
|
new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStatefulTaskDuringRemoval),
|
|
|
|
|
new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStandbyTaskDuringRemoval)
|
|
|
|
|
));
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
futureForRemovedStatefulTask.complete(new StateUpdater.RemovedTaskResult(removedStatefulTask));
|
|
|
|
|
futureForRemovedStandbyTask.complete(new StateUpdater.RemovedTaskResult(removedStandbyTask));
|
|
|
|
|
futureForRemovedFailedStatefulTask
|
|
|
|
@ -3825,7 +3794,7 @@ public class TaskManagerTest {
|
|
|
|
|
allOffsets.putAll(offsetsT01);
|
|
|
|
|
allOffsets.putAll(offsetsT02);
|
|
|
|
|
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
|
|
|
|
|
|
|
|
|
|
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
|
|
|
|
|
task01.setCommittableOffsetsAndMetadata(offsetsT01);
|
|
|
|
@ -4248,28 +4217,30 @@ public class TaskManagerTest {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldPunctuateActiveTasks() {
|
|
|
|
|
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
|
|
|
|
|
@Override
|
|
|
|
|
public boolean maybePunctuateStreamTime() {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean maybePunctuateSystemTime() {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
|
|
|
|
|
.withInputPartitions(taskId00Partitions)
|
|
|
|
|
.inState(State.RUNNING)
|
|
|
|
|
.build();
|
|
|
|
|
|
|
|
|
|
when(consumer.assignment()).thenReturn(assignment);
|
|
|
|
|
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
|
|
|
|
|
when(task00.maybePunctuateStreamTime()).thenReturn(true);
|
|
|
|
|
when(task00.maybePunctuateSystemTime()).thenReturn(true);
|
|
|
|
|
|
|
|
|
|
taskManager.handleAssignment(taskId00Assignment, emptyMap());
|
|
|
|
|
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
|
|
|
|
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
|
|
|
|
when(tasks.activeTasks()).thenReturn(Set.of(task00));
|
|
|
|
|
|
|
|
|
|
assertThat(task00.state(), is(Task.State.RUNNING));
|
|
|
|
|
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
|
|
|
|
|
when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(false);
|
|
|
|
|
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of());
|
|
|
|
|
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(List.of());
|
|
|
|
|
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
|
|
|
|
|
|
|
|
|
|
// one for stream and one for system time
|
|
|
|
|
assertThat(taskManager.punctuate(), equalTo(2));
|
|
|
|
|
|
|
|
|
|
verify(task00).maybePunctuateStreamTime();
|
|
|
|
|
verify(task00).maybePunctuateSystemTime();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
@ -4543,7 +4514,7 @@ public class TaskManagerTest {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV2() {
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
|
|
|
|
|
|
|
|
|
|
final StreamsProducer producer = mock(StreamsProducer.class);
|
|
|
|
|
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
|
|
|
|
@ -4748,7 +4719,7 @@ public class TaskManagerTest {
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldRecycleStartupTasksFromStateDirectoryAsActiveWithStateUpdater() {
|
|
|
|
|
final Tasks taskRegistry = new Tasks(new LogContext());
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
|
|
|
|
|
final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build();
|
|
|
|
|
|
|
|
|
|
final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions).build();
|
|
|
|
@ -4786,7 +4757,7 @@ public class TaskManagerTest {
|
|
|
|
|
@Test
|
|
|
|
|
public void shouldUseStartupTasksFromStateDirectoryAsStandbyWithStateUpdater() {
|
|
|
|
|
final Tasks taskRegistry = new Tasks(new LogContext());
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
|
|
|
|
|
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
|
|
|
|
|
final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build();
|
|
|
|
|
|
|
|
|
|
when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
|
|
|
|
@ -4816,18 +4787,6 @@ public class TaskManagerTest {
|
|
|
|
|
assertEquals(Collections.singletonMap(taskId00, startupTask), taskManager.standbyTaskMap());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ParameterizedTest
|
|
|
|
|
@ValueSource(booleans = {true, false})
|
|
|
|
|
public void shouldStartStateUpdaterOnInit(final boolean stateUpdaterEnabled) {
|
|
|
|
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, stateUpdaterEnabled);
|
|
|
|
|
taskManager.init();
|
|
|
|
|
if (stateUpdaterEnabled) {
|
|
|
|
|
verify(stateUpdater).start();
|
|
|
|
|
} else {
|
|
|
|
|
verify(stateUpdater, never()).start();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static KafkaFutureImpl<DeletedRecords> completedFuture() {
|
|
|
|
|
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
|
|
|
|
|
futureDeletedRecords.complete(null);
|
|
|
|
|