MINOR: Do not checkpoint standbys when handling corrupted tasks (#14709)

When a task is corrupted, uncorrupted tasks are committed. That is also true for standby tasks. Committing standby tasks actually means that they are checkpointed.

When the state updater is enabled, standbys are owned by the state updater. The stream thread should not checkpoint them when handling corrupted tasks. That is not a big limitation since the state updater periodically checkpoints standbys anyway. Additionally, when handling corrupted tasks the important thing is to commit active running tasks to abort the transaction. Committing standby tasks do not abort any transaction.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Bruno Cadonna 2023-11-08 17:09:24 +01:00 committed by GitHub
parent 91fa196930
commit f1e58a35d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 3 deletions

View File

@ -231,7 +231,7 @@ public class TaskManager {
// We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort
try {
final Collection<Task> tasksToCommit = allTasks()
final Collection<Task> tasksToCommit = tasks.allTasksPerId()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING)

View File

@ -2458,7 +2458,7 @@ public class TaskManagerTest {
}
@Test
public void shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitStandbyTasks() {
public void shouldNotCommitNonCorruptedRestoringActiveTasksAndNotCommitRunningStandbyTasksWithStateUpdaterEnabled() {
final StreamTask activeRestoringTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
@ -2479,7 +2479,44 @@ public class TaskManagerTest {
taskManager.handleCorruption(mkSet(taskId02));
Mockito.verify(activeRestoringTask, never()).commitNeeded();
Mockito.verify(standbyTask, times(2)).commitNeeded();
Mockito.verify(activeRestoringTask, never()).prepareCommit();
Mockito.verify(activeRestoringTask, never()).postCommit(Mockito.anyBoolean());
Mockito.verify(standbyTask, never()).commitNeeded();
Mockito.verify(standbyTask, never()).prepareCommit();
Mockito.verify(standbyTask, never()).postCommit(Mockito.anyBoolean());
verify(consumer);
}
@Test
public void shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitRunningStandbyTasksWithStateUpdaterDisabled() {
final StreamTask activeRestoringTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
when(standbyTask.commitNeeded()).thenReturn(true);
final StreamTask corruptedTask = statefulTask(taskId02, taskId02ChangelogPartitions)
.withInputPartitions(taskId02Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
when(tasks.allTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, activeRestoringTask),
mkEntry(taskId01, standbyTask),
mkEntry(taskId02, corruptedTask)
));
when(tasks.task(taskId02)).thenReturn(corruptedTask);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
expect(consumer.assignment()).andReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
replay(consumer);
taskManager.handleCorruption(mkSet(taskId02));
Mockito.verify(activeRestoringTask, never()).commitNeeded();
Mockito.verify(activeRestoringTask, never()).prepareCommit();
Mockito.verify(activeRestoringTask, never()).postCommit(Mockito.anyBoolean());
Mockito.verify(standbyTask).prepareCommit();
Mockito.verify(standbyTask).postCommit(Mockito.anyBoolean());
}
@Test