rewrite to test same as previous

This commit is contained in:
Shashank Hosahalli Shivamurthy 2025-09-19 10:52:38 -07:00
parent 9daf9cdb0f
commit 06b65dc5d3
1 changed files with 24 additions and 14 deletions

View File

@ -2282,31 +2282,41 @@ public class TaskManagerTest {
}
@Test
public void shouldNotCommitCorruptedTasksOnTaskCorruptedException() {
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
final StreamTask corruptedTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RUNNING)
.build();
final TaskCorruptedException corruptedException = new TaskCorruptedException(singleton(taskId00));
final ExceptionAndTask exceptionAndTask = new ExceptionAndTask(corruptedException, corruptedTask);
when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(true);
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(singletonList(exceptionAndTask));
final StreamTask nonCorruptedTask = statefulTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING)
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.task(taskId00)).thenReturn(corruptedTask);
when(tasks.allTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, corruptedTask),
mkEntry(taskId01, nonCorruptedTask)
));
when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
when(nonCorruptedTask.commitNeeded()).thenReturn(true);
when(nonCorruptedTask.prepareCommit(true)).thenReturn(emptyMap());
when(corruptedTask.prepareCommit(false)).thenReturn(emptyMap());
doNothing().when(corruptedTask).postCommit(anyBoolean());
when(consumer.assignment()).thenReturn(taskId00Partitions);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
final TaskCorruptedException thrown = assertThrows(
TaskCorruptedException.class,
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
taskManager.handleCorruption(Set.of(taskId00));
assertEquals(singleton(taskId00), thrown.corruptedTasks());
assertEquals("Tasks [0_0] are corrupted and hence need to be re-initialized", thrown.getMessage());
verify(tasks).addFailedTask(corruptedTask);
verify(nonCorruptedTask).prepareCommit(true);
verify(nonCorruptedTask, never()).addPartitionsForOffsetReset(any());
verify(corruptedTask).addPartitionsForOffsetReset(taskId00Partitions);
verify(corruptedTask).changelogPartitions();
verify(corruptedTask).postCommit(true);
// check that we should not commit empty map either
verify(consumer, never()).commitSync(emptyMap());