KAFKA-10199: Remove tasks from state updater on revocation (#12520)

Removes tasks from the state updater when the input partitions of the tasks are revoked during a rebalance.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Bruno Cadonna 2022-08-17 20:13:34 +02:00 committed by GitHub
parent 9f20f89953
commit b47c4d8598
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 0 deletions

View File

@ -757,6 +757,8 @@ public class TaskManager {
}
}
removeRevokedTasksFromStateUpdater(remainingRevokedPartitions);
if (!remainingRevokedPartitions.isEmpty()) {
log.debug("The following revoked partitions {} are missing from the current task partitions. It could "
+ "potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks " +
@ -842,6 +844,20 @@ public class TaskManager {
}
}
private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remainingRevokedPartitions) {
if (stateUpdater != null) {
for (final Task restoringTask : stateUpdater.getTasks()) {
if (restoringTask.isActive()) {
if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
tasks.addPendingTaskToClose(restoringTask.id());
stateUpdater.remove(restoringTask.id());
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
}
}
}
}
}
private void prepareCommitAndAddOffsetsToMap(final Set<Task> tasksToPrepare,
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
for (final Task task : tasksToPrepare) {

View File

@ -751,6 +751,87 @@ public class TaskManagerTest {
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
}
@Test
public void shouldRemoveStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation() {
final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TaskManager taskManager = setupForRevocation(mkSet(task), mkSet(task));
taskManager.handleRevocation(taskId00Partitions);
Mockito.verify(stateUpdater).remove(task.id());
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
Mockito.verify(task).closeClean();
}
public void shouldRemoveMultipleStatefulTaskWithRevokedInputPartitionsFromStateUpdaterOnRevocation() {
final StreamTask task1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask task2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId01Partitions).build();
final TaskManager taskManager = setupForRevocation(mkSet(task1, task2), mkSet(task1, task2));
taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, taskId01Partitions));
Mockito.verify(stateUpdater).remove(task1.id());
Mockito.verify(stateUpdater).remove(task2.id());
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
Mockito.verify(task1).closeClean();
Mockito.verify(task2).closeClean();
}
@Test
public void shouldNotRemoveStatefulTaskWithoutRevokedInputPartitionsFromStateUpdaterOnRevocation() {
final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TaskManager taskManager = setupForRevocation(mkSet(task), Collections.emptySet());
taskManager.handleRevocation(taskId01Partitions);
Mockito.verify(stateUpdater, never()).remove(task.id());
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
Mockito.verify(task, never()).closeClean();
}
@Test
public void shouldNotRemoveStandbyTaskFromStateUpdaterOnRevocation() {
final StandbyTask task = standbyTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId00Partitions).build();
final TaskManager taskManager = setupForRevocation(mkSet(task), Collections.emptySet());
taskManager.handleRevocation(taskId00Partitions);
Mockito.verify(stateUpdater, never()).remove(task.id());
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
Mockito.verify(task, never()).closeClean();
}
private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
final Set<Task> removedTasks) {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
when(stateUpdater.getTasks()).thenReturn(tasksInStateUpdater);
when(stateUpdater.drainRemovedTasks()).thenReturn(removedTasks);
expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
consumer.resume(anyObject());
expectLastCall().anyTimes();
replay(consumer);
return taskManager;
}
@Test
public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true);