explicit function renaming

This commit is contained in:
Shashank Hosahalli Shivamurthy 2025-10-03 17:06:30 -07:00
parent 06b65dc5d3
commit 85d1863856
1 changed files with 94 additions and 116 deletions

View File

@ -61,8 +61,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ -216,20 +214,16 @@ public class TaskManagerTest {
@BeforeEach
public void setUp() {
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false, false);
taskManager = setUpTaskManagerWithoutStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
}
private TaskManager setUpTaskManager(final ProcessingMode processingMode, final TasksRegistry tasks) {
return setUpTaskManager(processingMode, tasks, false);
private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode processingMode, final TasksRegistry tasks) {
return setUpTaskManagerWithStateUpdater(processingMode, tasks, false);
}
private TaskManager setUpTaskManager(final ProcessingMode processingMode, final boolean stateUpdaterEnabled) {
return setUpTaskManager(processingMode, null, stateUpdaterEnabled, false);
}
private TaskManager setUpTaskManager(final ProcessingMode processingMode,
final TasksRegistry tasks,
final boolean processingThreadsEnabled) {
private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode processingMode,
final TasksRegistry tasks,
final boolean processingThreadsEnabled) {
topologyMetadata = new TopologyMetadata(topologyBuilder, new DummyStreamsConfig(processingMode));
final TaskManager taskManager = new TaskManager(
time,
@ -249,10 +243,9 @@ public class TaskManagerTest {
return taskManager;
}
private TaskManager setUpTaskManager(final ProcessingMode processingMode,
final TasksRegistry tasks,
final boolean stateUpdaterEnabled,
final boolean processingThreadsEnabled) {
private TaskManager setUpTaskManagerWithoutStateUpdater(final ProcessingMode processingMode,
final TasksRegistry tasks,
final boolean processingThreadsEnabled) {
topologyMetadata = new TopologyMetadata(topologyBuilder, new DummyStreamsConfig(processingMode));
final TaskManager taskManager = new TaskManager(
time,
@ -265,7 +258,7 @@ public class TaskManagerTest {
topologyMetadata,
adminClient,
stateDirectory,
stateUpdaterEnabled ? stateUpdater : null,
null,
processingThreadsEnabled ? schedulingTaskManager : null
);
taskManager.setMainConsumer(consumer);
@ -279,7 +272,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
when(tasks.task(taskId00)).thenReturn(activeTask1);
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
@ -301,7 +294,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@ -314,14 +307,14 @@ public class TaskManagerTest {
@Test
public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
taskManager.handleAssignment(
mkMap(mkEntry(taskId00, taskId00Partitions)),
mkMap(mkEntry(taskId01, taskId01Partitions))
mkMap(mkEntry(taskId01, taskId01Partitions))
);
verify(schedulingTaskManager).lockTasks(Set.of(taskId00, taskId01));
@ -337,7 +330,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@ -357,7 +350,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@ -377,7 +370,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
taskManager.resumePollingForPartitionsWithAvailableSpace();
@ -395,7 +388,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
taskManager.updateLags();
@ -410,7 +403,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
@ -430,7 +423,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future);
@ -451,7 +444,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
@ -471,7 +464,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future);
@ -492,7 +485,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTask));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.remove(failedStandbyTask.id())).thenReturn(future);
@ -519,7 +512,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
final CompletableFuture<StateUpdater.RemovedTaskResult> future = new CompletableFuture<>();
when(stateUpdater.remove(activeTaskToUpdateInputPartitions.id())).thenReturn(future);
@ -547,7 +540,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId03Partitions))
.thenReturn(recycledStandbyTask);
@ -571,7 +564,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, activeTaskToRecycle.inputPartitions()))
.thenThrow(new RuntimeException());
@ -601,7 +594,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, taskId03Partitions, consumer))
.thenReturn(recycledActiveTask);
@ -625,7 +618,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle));
when(activeTaskCreator.createActiveTaskFromStandby(
standbyTaskToRecycle,
@ -655,7 +648,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask));
taskManager.handleAssignment(
@ -674,7 +667,7 @@ public class TaskManagerTest {
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask));
taskManager.handleAssignment(
@ -694,7 +687,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToRecycle));
final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!");
when(stateUpdater.remove(failedActiveTaskToRecycle.id()))
@ -724,7 +717,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTaskToRecycle));
final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!");
when(stateUpdater.remove(failedStandbyTaskToRecycle.id()))
@ -754,7 +747,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToReassign));
final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!");
when(stateUpdater.remove(failedActiveTaskToReassign.id()))
@ -787,7 +780,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1));
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2));
when(stateUpdater.remove(reassignedActiveTask2.id()))
@ -813,7 +806,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
taskManager.handleAssignment(
@ -831,7 +824,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedStandbyTask));
taskManager.handleAssignment(
@ -855,7 +848,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose, standbyTaskToRecycle));
final CompletableFuture<StateUpdater.RemovedTaskResult> futureForActiveTaskToClose = new CompletableFuture<>();
when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(futureForActiveTaskToClose);
@ -890,7 +883,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater));
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, runningActiveTask)));
@ -914,7 +907,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask)));
assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, activeTask)));
@ -926,7 +919,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final Set<Task> createdTasks = Set.of(activeTaskToBeCreated);
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
mkEntry(activeTaskToBeCreated.id(), activeTaskToBeCreated.inputPartitions()));
@ -944,7 +937,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final Set<Task> createdTasks = Set.of(standbyTaskToBeCreated);
when(standbyTaskCreator.createTasks(mkMap(
mkEntry(standbyTaskToBeCreated.id(), standbyTaskToBeCreated.inputPartitions())))
@ -971,7 +964,7 @@ public class TaskManagerTest {
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId01Partitions))
.thenReturn(standbyTask);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
@ -989,7 +982,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@ -1010,7 +1003,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
@ -1028,7 +1021,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose));
final IllegalStateException illegalStateException = assertThrows(
@ -1048,7 +1041,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, newInputPartitions)).thenReturn(true);
@ -1068,7 +1061,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
taskManager.handleAssignment(
@ -1086,7 +1079,7 @@ public class TaskManagerTest {
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
taskManager.handleAssignment(
@ -1108,7 +1101,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
final IllegalStateException illegalStateException = assertThrows(
@ -1133,7 +1126,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
taskManager.handleAssignment(
@ -1159,7 +1152,7 @@ public class TaskManagerTest {
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01));
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@ -1181,7 +1174,7 @@ public class TaskManagerTest {
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01));
final LockException lockException = new LockException("Where are my keys??");
doThrow(lockException).when(task00).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@ -1205,7 +1198,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01));
doThrow(new LockException("Lock Exception!")).when(task00).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@ -1251,7 +1244,7 @@ public class TaskManagerTest {
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00));
final RuntimeException runtimeException = new RuntimeException("KABOOM!");
doThrow(runtimeException).when(task00).initializeIfNeeded();
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
final StreamsException streamsException = assertThrows(
StreamsException.class,
@ -1277,7 +1270,7 @@ public class TaskManagerTest {
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks, false);
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(statefulTask0, statefulTask1, statefulTask2));
doThrow(new TaskCorruptedException(Collections.singleton(statefulTask0.id))).when(statefulTask0).initializeIfNeeded();
doThrow(new TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded();
@ -1298,7 +1291,7 @@ public class TaskManagerTest {
public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}
@ -1307,7 +1300,7 @@ public class TaskManagerTest {
public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToRecycleButPendingTasksToInit() {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.hasPendingTasksToInit()).thenReturn(true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}
@ -1315,7 +1308,7 @@ public class TaskManagerTest {
@Test
public void shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingInit() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
}
@ -1534,8 +1527,8 @@ public class TaskManagerTest {
}
private TaskManager setupForRevocationAndLost(final Set<Task> tasksInStateUpdater,
final TasksRegistry tasks) {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TasksRegistry tasks) {
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(tasksInStateUpdater);
return taskManager;
@ -1571,7 +1564,7 @@ public class TaskManagerTest {
}
private void verifyTransitionToRunningOfRestoredTask(final Set<StreamTask> restoredTasks,
final TasksRegistry tasks) {
final TasksRegistry tasks) {
for (final StreamTask restoredTask : restoredTasks) {
verify(restoredTask).completeRestoration(noOpResetter);
verify(restoredTask).clearTaskTimeout();
@ -1600,16 +1593,16 @@ public class TaskManagerTest {
}
private TaskManager setUpTransitionToRunningOfRestoredTask(final Set<StreamTask> statefulTasks,
final TasksRegistry tasks) {
final TasksRegistry tasks) {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(statefulTasks);
return setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
return setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
}
@Test
public void shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null, false);
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
@ -1627,7 +1620,7 @@ public class TaskManagerTest {
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final StreamsException thrown = assertThrows(
StreamsException.class,
@ -1654,7 +1647,7 @@ public class TaskManagerTest {
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0, exceptionAndTasks1));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final TaskCorruptedException thrown = assertThrows(
TaskCorruptedException.class,
@ -1738,7 +1731,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0));
final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
when(consumer.assignment()).thenReturn(assigned);
@ -1783,7 +1776,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask, restoringStatefulTask));
when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask));
@ -1815,7 +1808,7 @@ public class TaskManagerTest {
when(runningStatefulTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffsetOfRunningTask)));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
assertThat(
@ -1846,7 +1839,7 @@ public class TaskManagerTest {
final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
taskManager.handleRebalanceStart(singleton("topic"));
@ -1864,7 +1857,7 @@ public class TaskManagerTest {
final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask));
taskManager.handleRebalanceStart(singleton("topic"));
@ -1889,7 +1882,7 @@ public class TaskManagerTest {
when(restoringStandbyTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p2changelog, changelogOffsetOfRestoringStandbyTask)));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask, restoringStatefulTask));
@ -1914,7 +1907,7 @@ public class TaskManagerTest {
mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, restoringStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
@ -1927,7 +1920,7 @@ public class TaskManagerTest {
}
private void computeOffsetSumAndVerify(final Map<TopicPartition, Long> changelogOffsets,
final Map<TaskId, Long> expectedOffsetSums) throws Exception {
final Map<TaskId, Long> expectedOffsetSums) throws Exception {
expectLockObtainedFor(taskId00);
expectDirectoryNotEmpty(taskId00);
makeTaskFolders(taskId00.toString());
@ -1958,7 +1951,7 @@ public class TaskManagerTest {
when(standbyTask.changelogOffsets()).thenReturn(changelogOffsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask));
@ -2176,7 +2169,7 @@ public class TaskManagerTest {
@Test
public void shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
taskManager.handleLostAll();
@ -2192,7 +2185,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
@ -2308,7 +2301,7 @@ public class TaskManagerTest {
when(consumer.assignment()).thenReturn(taskId00Partitions);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.handleCorruption(Set.of(taskId00));
@ -2366,7 +2359,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, corruptedTask)));
when(tasks.task(taskId02)).thenReturn(corruptedTask);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(consumer.assignment()).thenReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
taskManager.handleCorruption(Set.of(taskId02));
@ -2379,7 +2372,6 @@ public class TaskManagerTest {
verify(standbyTask, never()).postCommit(anyBoolean());
}
@Test
public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
@ -2522,7 +2514,7 @@ public class TaskManagerTest {
@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
@ -2654,7 +2646,7 @@ public class TaskManagerTest {
@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
@ -2686,7 +2678,7 @@ public class TaskManagerTest {
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
mkEntry(taskId02, taskId02Partitions)
);
);
when(consumer.assignment())
.thenReturn(assignment)
@ -2896,7 +2888,7 @@ public class TaskManagerTest {
@Test
public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() {
final StreamsProducer producer = mock(StreamsProducer.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
@ -3053,7 +3045,7 @@ public class TaskManagerTest {
@Test
public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@ -3189,7 +3181,7 @@ public class TaskManagerTest {
}
private void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final ProcessingMode processingMode) {
final TaskManager taskManager = setUpTaskManager(processingMode, null, false, false);
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(processingMode, null, false);
final TopicPartition changelog = new TopicPartition("changelog", 0);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
@ -3344,7 +3336,7 @@ public class TaskManagerTest {
@Test
public void shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() {
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
@ -3510,7 +3502,7 @@ public class TaskManagerTest {
new ExceptionAndTask(new RuntimeException(), failedStatefulTask),
new ExceptionAndTask(new RuntimeException(), failedStandbyTask))
);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
taskManager.shutdown(true);
@ -3524,7 +3516,7 @@ public class TaskManagerTest {
@Test
public void shouldShutdownSchedulingTaskManager() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.shutdown(true);
@ -3573,7 +3565,7 @@ public class TaskManagerTest {
new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStatefulTaskDuringRemoval),
new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStandbyTaskDuringRemoval)
));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
futureForRemovedStatefulTask.complete(new StateUpdater.RemovedTaskResult(removedStatefulTask));
futureForRemovedStandbyTask.complete(new StateUpdater.RemovedTaskResult(removedStandbyTask));
futureForRemovedFailedStatefulTask
@ -3802,7 +3794,7 @@ public class TaskManagerTest {
allOffsets.putAll(offsetsT01);
allOffsets.putAll(offsetsT02);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
task01.setCommittableOffsetsAndMetadata(offsetsT01);
@ -4242,9 +4234,7 @@ public class TaskManagerTest {
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of());
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(List.of());
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
// one for stream and one for system time
assertThat(taskManager.punctuate(), equalTo(2));
@ -4524,7 +4514,7 @@ public class TaskManagerTest {
@Test
public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV2() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@ -4729,7 +4719,7 @@ public class TaskManagerTest {
@Test
public void shouldRecycleStartupTasksFromStateDirectoryAsActiveWithStateUpdater() {
final Tasks taskRegistry = new Tasks(new LogContext());
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build();
final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions).build();
@ -4767,7 +4757,7 @@ public class TaskManagerTest {
@Test
public void shouldUseStartupTasksFromStateDirectoryAsStandbyWithStateUpdater() {
final Tasks taskRegistry = new Tasks(new LogContext());
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build();
when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
@ -4797,18 +4787,6 @@ public class TaskManagerTest {
assertEquals(Collections.singletonMap(taskId00, startupTask), taskManager.standbyTaskMap());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void shouldStartStateUpdaterOnInit(final boolean stateUpdaterEnabled) {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, stateUpdaterEnabled);
taskManager.init();
if (stateUpdaterEnabled) {
verify(stateUpdater).start();
} else {
verify(stateUpdater, never()).start();
}
}
private static KafkaFutureImpl<DeletedRecords> completedFuture() {
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
futureDeletedRecords.complete(null);