mirror of https://github.com/apache/kafka.git
KAFKA-10199: Remove tasks from state updater on shutdown (#12562)
The state updater removes its updating and paused task on shutdown. The removed tasks are added to the output queue for removed tasks. Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
This commit is contained in:
parent
0e6a3fa978
commit
7b07b2676b
|
|
@ -60,6 +60,7 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
private final ChangelogReader changelogReader;
|
||||
private final AtomicBoolean isRunning = new AtomicBoolean(true);
|
||||
private final Map<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
|
||||
private final Map<TaskId, Task> pausedTasks = new ConcurrentHashMap<>();
|
||||
private final Logger log;
|
||||
|
||||
public StateUpdaterThread(final String name, final ChangelogReader changelogReader) {
|
||||
|
|
@ -86,6 +87,10 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
return !updatingTasks.isEmpty() && updatingTasks.values().stream().noneMatch(Task::isActive);
|
||||
}
|
||||
|
||||
public Collection<Task> getPausedTasks() {
|
||||
return pausedTasks.values();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
log.info("State updater thread started");
|
||||
|
|
@ -100,7 +105,8 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
} catch (final RuntimeException anyOtherException) {
|
||||
handleRuntimeException(anyOtherException);
|
||||
} finally {
|
||||
clear();
|
||||
removeAddedTasksFromInputQueue();
|
||||
removeUpdatingAndPausedTasks();
|
||||
shutdownGate.countDown();
|
||||
log.info("State updater thread shutdown");
|
||||
}
|
||||
|
|
@ -224,18 +230,16 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
}
|
||||
}
|
||||
|
||||
private void clear() {
|
||||
tasksAndActionsLock.lock();
|
||||
restoredActiveTasksLock.lock();
|
||||
try {
|
||||
tasksAndActions.clear();
|
||||
restoredActiveTasks.clear();
|
||||
} finally {
|
||||
restoredActiveTasksLock.unlock();
|
||||
tasksAndActionsLock.unlock();
|
||||
}
|
||||
private void removeUpdatingAndPausedTasks() {
|
||||
changelogReader.clear();
|
||||
updatingTasks.clear();
|
||||
updatingTasks.forEach((id, task) -> {
|
||||
task.maybeCheckpoint(true);
|
||||
removedTasks.add(task);
|
||||
});
|
||||
pausedTasks.forEach((id, task) -> {
|
||||
removedTasks.add(task);
|
||||
});
|
||||
pausedTasks.clear();
|
||||
}
|
||||
|
||||
private List<TaskAndAction> getTasksAndActions() {
|
||||
|
|
@ -388,7 +392,6 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition();
|
||||
private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<>();
|
||||
private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>();
|
||||
private final Map<TaskId, Task> pausedTasks = new ConcurrentHashMap<>();
|
||||
|
||||
private final long commitIntervalMs;
|
||||
private long lastCommitMs;
|
||||
|
|
@ -427,6 +430,23 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
stateUpdaterThread = null;
|
||||
} catch (final InterruptedException ignored) {
|
||||
}
|
||||
} else {
|
||||
removeAddedTasksFromInputQueue();
|
||||
}
|
||||
}
|
||||
|
||||
private void removeAddedTasksFromInputQueue() {
|
||||
tasksAndActionsLock.lock();
|
||||
try {
|
||||
TaskAndAction taskAndAction;
|
||||
while ((taskAndAction = tasksAndActions.peek()) != null) {
|
||||
if (taskAndAction.getAction() == Action.ADD) {
|
||||
removedTasks.add(taskAndAction.getTask());
|
||||
}
|
||||
tasksAndActions.poll();
|
||||
}
|
||||
} finally {
|
||||
tasksAndActionsLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -557,7 +577,9 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
}
|
||||
|
||||
public Set<Task> getPausedTasks() {
|
||||
return Collections.unmodifiableSet(new HashSet<>(pausedTasks.values()));
|
||||
return stateUpdaterThread != null
|
||||
? Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.getPausedTasks()))
|
||||
: Collections.emptySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -85,22 +85,22 @@ class DefaultStateUpdaterTest {
|
|||
|
||||
// need an auto-tick timer to work for draining with timeout
|
||||
private final Time time = new MockTime(1L);
|
||||
private final StreamsConfig config = new StreamsConfig(configProps());
|
||||
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
|
||||
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
|
||||
private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
|
||||
private DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
stateUpdater.shutdown(Duration.ofMinutes(1));
|
||||
}
|
||||
|
||||
private Properties configProps() {
|
||||
private Properties configProps(final int commitInterval) {
|
||||
return mkObjectProperties(mkMap(
|
||||
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
|
||||
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
|
||||
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2),
|
||||
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL),
|
||||
mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), COMMIT_INTERVAL)
|
||||
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval),
|
||||
mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), commitInterval)
|
||||
));
|
||||
}
|
||||
|
||||
|
|
@ -126,6 +126,80 @@ class DefaultStateUpdaterTest {
|
|||
verify(changelogReader, times(2)).clear();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldRemoveTasksFromAndClearInputQueueOnShutdown() throws Exception {
|
||||
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
|
||||
final StreamTask statelessTask = statelessTask(TASK_0_0).inState(State.RESTORING).build();
|
||||
final StreamTask statefulTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
|
||||
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
|
||||
stateUpdater.pause(TASK_0_1);
|
||||
stateUpdater.add(statelessTask);
|
||||
stateUpdater.add(statefulTask);
|
||||
stateUpdater.remove(TASK_1_1);
|
||||
stateUpdater.add(standbyTask);
|
||||
stateUpdater.resume(TASK_0_1);
|
||||
verifyRemovedTasks();
|
||||
|
||||
stateUpdater.shutdown(Duration.ofMinutes(1));
|
||||
|
||||
verifyRemovedTasks(statelessTask, statefulTask, standbyTask);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldRemoveUpdatingTasksOnShutdown() throws Exception {
|
||||
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
|
||||
stateUpdater = new DefaultStateUpdater(new StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, time);
|
||||
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
|
||||
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
|
||||
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
|
||||
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
|
||||
stateUpdater.start();
|
||||
stateUpdater.add(activeTask);
|
||||
stateUpdater.add(standbyTask);
|
||||
verifyUpdatingTasks(activeTask, standbyTask);
|
||||
verifyRemovedTasks();
|
||||
|
||||
stateUpdater.shutdown(Duration.ofMinutes(1));
|
||||
|
||||
verifyRemovedTasks(activeTask, standbyTask);
|
||||
verify(activeTask).maybeCheckpoint(true);
|
||||
verify(standbyTask).maybeCheckpoint(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldRemovePausedTasksOnShutdown() throws Exception {
|
||||
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
|
||||
final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
|
||||
stateUpdater.start();
|
||||
stateUpdater.add(activeTask);
|
||||
stateUpdater.add(standbyTask);
|
||||
stateUpdater.pause(activeTask.id());
|
||||
stateUpdater.pause(standbyTask.id());
|
||||
verifyPausedTasks(activeTask, standbyTask);
|
||||
verifyRemovedTasks();
|
||||
|
||||
stateUpdater.shutdown(Duration.ofMinutes(1));
|
||||
|
||||
verifyRemovedTasks(activeTask, standbyTask);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldRemovePausedAndUpdatingTasksOnShutdown() throws Exception {
|
||||
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
|
||||
final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
|
||||
stateUpdater.start();
|
||||
stateUpdater.add(activeTask);
|
||||
stateUpdater.add(standbyTask);
|
||||
stateUpdater.pause(standbyTask.id());
|
||||
verifyPausedTasks(standbyTask);
|
||||
verifyUpdatingTasks(activeTask);
|
||||
verifyRemovedTasks();
|
||||
|
||||
stateUpdater.shutdown(Duration.ofMinutes(1));
|
||||
|
||||
verifyRemovedTasks(activeTask, standbyTask);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowIfStatelessTaskNotInStateRestoring() {
|
||||
shouldThrowIfActiveTaskNotInStateRestoring(statelessTask(TASK_0_0).build());
|
||||
|
|
@ -1287,8 +1361,6 @@ class DefaultStateUpdaterTest {
|
|||
stateUpdater.add(standbyTask3);
|
||||
verifyUpdatingTasks(activeTask1, activeTask2, standbyTask1, standbyTask2, standbyTask3);
|
||||
|
||||
final Set<Task> tasks = stateUpdater.getTasks();
|
||||
|
||||
verifyGetTasks(mkSet(activeTask1, activeTask2), mkSet(standbyTask1, standbyTask2, standbyTask3));
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue