KAFKA-14299: Fix incorrect pauses in separate state restoration (#12743)

The original code path paused the main consumer for
all tasks before entering the restoration section
of the code, and then resumed all after restoration
has finished.

In the new state updater part of the code, tasks that
do not require restoration skip the restoration completely.
They remain with the TaskManger and are never transferred
to the StateUpdater, and thus are never resumed.

This change makes sure that tasks that remain with the
TaskManager are not paused.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Lucas Brutschy 2022-10-18 12:16:44 +02:00 committed by GitHub
parent 109b74c590
commit cc582897bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 1 deletions

View File

@ -175,7 +175,18 @@ public class TaskManager {
void handleRebalanceComplete() { void handleRebalanceComplete() {
// we should pause consumer only within the listener since // we should pause consumer only within the listener since
// before then the assignment has not been updated yet. // before then the assignment has not been updated yet.
mainConsumer.pause(mainConsumer.assignment()); if (stateUpdater == null) {
mainConsumer.pause(mainConsumer.assignment());
} else {
// All tasks that are owned by the task manager are ready and do not need to be paused
final Set<TopicPartition> partitionsNotToPause = tasks.allTasks()
.stream()
.flatMap(task -> task.inputPartitions().stream())
.collect(Collectors.toSet());
final Set<TopicPartition> partitionsToPause = new HashSet<>(mainConsumer.assignment());
partitionsToPause.removeAll(partitionsNotToPause);
mainConsumer.pause(partitionsToPause);
}
releaseLockedUnassignedTaskDirectories(); releaseLockedUnassignedTaskDirectories();

View File

@ -1481,6 +1481,36 @@ public class TaskManagerTest {
assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
} }
@Test
public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
expect(consumer.assignment()).andReturn(assigned);
consumer.pause(assigned);
replay(consumer);
taskManager.handleRebalanceComplete();
verify(consumer);
}
@Test
public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
expect(consumer.assignment()).andReturn(assigned);
consumer.pause(mkSet(t1p1));
replay(consumer);
taskManager.handleRebalanceComplete();
verify(consumer);
}
@Test @Test
public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception { public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
expectLockObtainedFor(taskId00, taskId01, taskId02); expectLockObtainedFor(taskId00, taskId01, taskId02);