mirror of https://github.com/apache/kafka.git
KAFKA-17402: DefaultStateUpdated should transite task atomically (#18607)
Reviewers: Bruno Cadonna <bruno@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
6612dd5c0b
commit
7f9cf895d0
|
|
@ -673,7 +673,6 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
measureCheckpointLatency(() -> task.maybeCheckpoint(true));
|
||||
changelogReader.unregister(changelogPartitions);
|
||||
addToRestoredTasks(task);
|
||||
updatingTasks.remove(task.id());
|
||||
log.info("Stateful active task " + task.id() + " completed restoration");
|
||||
transitToUpdateStandbysIfOnlyStandbysLeft();
|
||||
}
|
||||
|
|
@ -689,6 +688,7 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
restoredActiveTasksLock.lock();
|
||||
try {
|
||||
restoredActiveTasks.add(task);
|
||||
updatingTasks.remove(task.id());
|
||||
log.debug("Active task " + task.id() + " was added to the restored tasks");
|
||||
restoredActiveTasksCondition.signalAll();
|
||||
} finally {
|
||||
|
|
|
|||
Loading…
Reference in New Issue