KAFKA-19683: Remove dead tests and modify tests in TaskManagerTest [1/N] (#20501)

This is the first part of cleaning up of the tests in `TaskManagerTest`
- Removed dead tests
- Added new tests as suggested earlier

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Shashank 2025-09-16 11:46:20 -07:00 committed by GitHub
parent 9f657abf3a
commit b043ca2074
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 39 additions and 62 deletions

View File

@ -152,6 +152,7 @@ public class TaskManagerTest {
private final TopicPartition t1p1 = new TopicPartition(topic1, 1); private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
private final TopicPartition t2p2 = new TopicPartition(topic2, 1); private final TopicPartition t2p2 = new TopicPartition(topic2, 1);
private final TopicPartition t1p1changelog = new TopicPartition("changelog", 1); private final TopicPartition t1p1changelog = new TopicPartition("changelog", 1);
private final TopicPartition t1p1changelog2 = new TopicPartition("changelog2", 1);
private final Set<TopicPartition> taskId01Partitions = Set.of(t1p1); private final Set<TopicPartition> taskId01Partitions = Set.of(t1p1);
private final Set<TopicPartition> taskId01ChangelogPartitions = Set.of(t1p1changelog); private final Set<TopicPartition> taskId01ChangelogPartitions = Set.of(t1p1changelog);
private final Map<TaskId, Set<TopicPartition>> taskId01Assignment = singletonMap(taskId01, taskId01Partitions); private final Map<TaskId, Set<TopicPartition>> taskId01Assignment = singletonMap(taskId01, taskId01Partitions);
@ -218,6 +219,10 @@ public class TaskManagerTest {
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false); taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
} }
private TaskManager setUpTaskManager(final ProcessingMode processingMode, final TasksRegistry tasks) {
return setUpTaskManager(processingMode, tasks, false);
}
private TaskManager setUpTaskManager(final ProcessingMode processingMode, final boolean stateUpdaterEnabled) { private TaskManager setUpTaskManager(final ProcessingMode processingMode, final boolean stateUpdaterEnabled) {
return setUpTaskManager(processingMode, null, stateUpdaterEnabled, false); return setUpTaskManager(processingMode, null, stateUpdaterEnabled, false);
} }
@ -249,52 +254,6 @@ public class TaskManagerTest {
return taskManager; return taskManager;
} }
@Test
public void shouldClassifyExistingTasksWithoutStateUpdater() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, false);
final Map<TaskId, Set<TopicPartition>> runningActiveTasks = mkMap(mkEntry(taskId01, Set.of(t1p1)));
final Map<TaskId, Set<TopicPartition>> standbyTasks = mkMap(mkEntry(taskId02, Set.of(t2p2)));
final Map<TaskId, Set<TopicPartition>> restoringActiveTasks = mkMap(mkEntry(taskId03, Set.of(t1p3)));
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(runningActiveTasks);
activeTasks.putAll(restoringActiveTasks);
handleAssignment(runningActiveTasks, standbyTasks, restoringActiveTasks);
taskManager.handleAssignment(activeTasks, standbyTasks);
verifyNoInteractions(stateUpdater);
}
@Test
public void shouldNotUpdateExistingStandbyTaskIfStandbyIsReassignedWithSameInputPartitionWithoutStateUpdater() {
final StandbyTask standbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, taskId03Partitions);
verify(standbyTask, never()).updateInputPartitions(eq(taskId03Partitions), any());
}
@Test
public void shouldUpdateExistingStandbyTaskIfStandbyIsReassignedWithDifferentInputPartitionWithoutStateUpdater() {
final StandbyTask standbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, taskId04Partitions);
verify(standbyTask).updateInputPartitions(eq(taskId04Partitions), any());
}
private void updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(final Task standbyTask,
final Set<TopicPartition> newInputPartition) {
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(Set.of(standbyTask));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
taskManager.handleAssignment(
Collections.emptyMap(),
mkMap(mkEntry(standbyTask.id(), newInputPartition))
);
verify(standbyTask).resume();
}
@Test @Test
public void shouldLockAllTasksOnCorruptionWithProcessingThreads() { public void shouldLockAllTasksOnCorruptionWithProcessingThreads() {
@ -1853,14 +1812,20 @@ public class TaskManagerTest {
} }
@Test @Test
public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception { public void shouldComputeOffsetSumForRunningStatefulTask() {
final Map<TopicPartition, Long> changelogOffsets = mkMap( final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
mkEntry(new TopicPartition("changelog", 0), Task.LATEST_OFFSET), .inState(State.RUNNING).build();
mkEntry(new TopicPartition("changelog", 1), Task.LATEST_OFFSET) final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
); when(runningStatefulTask.changelogOffsets())
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, Task.LATEST_OFFSET)); .thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffsetOfRunningTask)));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums); assertThat(
taskManager.taskOffsetSums(),
is(mkMap(mkEntry(taskId00, changelogOffsetOfRunningTask)))
);
} }
@Test @Test
@ -1911,14 +1876,14 @@ public class TaskManagerTest {
} }
@Test @Test
public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() { public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask() {
final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING).build(); .inState(State.RUNNING).build();
final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions) final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING).build(); .inState(State.RESTORING).build();
final StandbyTask restoringStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) final StandbyTask restoringStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING).build(); .inState(State.RUNNING).build();
final long changelogOffsetOfRunningTask = 42L; final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
final long changelogOffsetOfRestoringStatefulTask = 24L; final long changelogOffsetOfRestoringStatefulTask = 24L;
final long changelogOffsetOfRestoringStandbyTask = 84L; final long changelogOffsetOfRestoringStandbyTask = 84L;
when(runningStatefulTask.changelogOffsets()) when(runningStatefulTask.changelogOffsets())
@ -1943,14 +1908,26 @@ public class TaskManagerTest {
} }
@Test @Test
public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception { public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() {
final Map<TopicPartition, Long> changelogOffsets = mkMap( final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
mkEntry(new TopicPartition("changelog", 0), OffsetCheckpoint.OFFSET_UNKNOWN), .inState(State.RESTORING).build();
mkEntry(new TopicPartition("changelog", 1), 10L) final long changelogOffsetOfRestoringStandbyTask = 84L;
); when(restoringStatefulTask.changelogOffsets())
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 10L)); .thenReturn(mkMap(
mkEntry(t1p1changelog, changelogOffsetOfRestoringStandbyTask),
mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, restoringStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums); assertThat(
taskManager.taskOffsetSums(),
is(mkMap(
mkEntry(taskId01, changelogOffsetOfRestoringStandbyTask)
))
);
} }
private void computeOffsetSumAndVerify(final Map<TopicPartition, Long> changelogOffsets, private void computeOffsetSumAndVerify(final Map<TopicPartition, Long> changelogOffsets,