mirror of https://github.com/apache/kafka.git
KAFKA-16025: Fix orphaned locks when rebalancing and store cleanup race on unassigned task directories (#15088)
KAFKA-16025 describes the race condition sequence in detail. When this occurs, it can cause the impacted task's initializing to block indefinitely, blocking progress on the impacted task, and any other task assigned to the same stream thread. The fix I have implemented is pretty simple, simply re-check whether a directory is still empty after locking it during the start of rebalancing, and if it is, unlock it immediately. This preserves the idempotency of the method when it coincides with parallel state store cleanup executions. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
parent
b2bfd5d110
commit
f1a0207cbb
|
@ -1249,9 +1249,15 @@ public class TaskManager {
|
||||||
try {
|
try {
|
||||||
final TaskId id = parseTaskDirectoryName(dir.getName(), namedTopology);
|
final TaskId id = parseTaskDirectoryName(dir.getName(), namedTopology);
|
||||||
if (stateDirectory.lock(id)) {
|
if (stateDirectory.lock(id)) {
|
||||||
lockedTaskDirectories.add(id);
|
// Check again in case the cleaner thread ran and emptied the directory
|
||||||
if (!allTasks.containsKey(id)) {
|
if (stateDirectory.directoryForTaskIsEmpty(id)) {
|
||||||
log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id);
|
log.debug("Releasing lock on empty directory for task {}", id);
|
||||||
|
stateDirectory.unlock(id);
|
||||||
|
} else {
|
||||||
|
lockedTaskDirectories.add(id);
|
||||||
|
if (!allTasks.containsKey(id)) {
|
||||||
|
log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (final TaskIdFormatException e) {
|
} catch (final TaskIdFormatException e) {
|
||||||
|
|
|
@ -1905,6 +1905,7 @@ public class TaskManagerTest {
|
||||||
public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
|
public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
|
||||||
expectLockObtainedFor(taskId01);
|
expectLockObtainedFor(taskId01);
|
||||||
expectLockFailedFor(taskId10);
|
expectLockFailedFor(taskId10);
|
||||||
|
expectDirectoryNotEmpty(taskId01);
|
||||||
|
|
||||||
makeTaskFolders(
|
makeTaskFolders(
|
||||||
taskId01.toString(),
|
taskId01.toString(),
|
||||||
|
@ -1918,6 +1919,21 @@ public class TaskManagerTest {
|
||||||
assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
|
assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldUnlockEmptyDirsAtRebalanceStart() throws Exception {
|
||||||
|
expectLockObtainedFor(taskId01, taskId10);
|
||||||
|
expectDirectoryNotEmpty(taskId01);
|
||||||
|
expect(stateDirectory.directoryForTaskIsEmpty(taskId10)).andReturn(true);
|
||||||
|
expectUnlockFor(taskId10);
|
||||||
|
|
||||||
|
makeTaskFolders(taskId01.toString(), taskId10.toString());
|
||||||
|
replay(stateDirectory);
|
||||||
|
taskManager.handleRebalanceStart(singleton("topic"));
|
||||||
|
|
||||||
|
verify(stateDirectory);
|
||||||
|
assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
|
public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
|
||||||
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
|
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
|
||||||
|
@ -1951,6 +1967,7 @@ public class TaskManagerTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
|
public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
|
||||||
expectLockObtainedFor(taskId00, taskId01, taskId02);
|
expectLockObtainedFor(taskId00, taskId01, taskId02);
|
||||||
|
expectDirectoryNotEmpty(taskId00, taskId01, taskId02);
|
||||||
expectUnlockFor(taskId02);
|
expectUnlockFor(taskId02);
|
||||||
|
|
||||||
makeTaskFolders(
|
makeTaskFolders(
|
||||||
|
@ -1993,6 +2010,7 @@ public class TaskManagerTest {
|
||||||
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, restoringStatefulTask));
|
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, restoringStatefulTask));
|
||||||
when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
|
when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
|
||||||
expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
|
expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
|
||||||
|
expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
|
||||||
expectUnlockFor(taskId03);
|
expectUnlockFor(taskId03);
|
||||||
makeTaskFolders(
|
makeTaskFolders(
|
||||||
taskId00.toString(),
|
taskId00.toString(),
|
||||||
|
@ -2121,6 +2139,7 @@ public class TaskManagerTest {
|
||||||
private void computeOffsetSumAndVerify(final Map<TopicPartition, Long> changelogOffsets,
|
private void computeOffsetSumAndVerify(final Map<TopicPartition, Long> changelogOffsets,
|
||||||
final Map<TaskId, Long> expectedOffsetSums) throws Exception {
|
final Map<TaskId, Long> expectedOffsetSums) throws Exception {
|
||||||
expectLockObtainedFor(taskId00);
|
expectLockObtainedFor(taskId00);
|
||||||
|
expectDirectoryNotEmpty(taskId00);
|
||||||
makeTaskFolders(taskId00.toString());
|
makeTaskFolders(taskId00.toString());
|
||||||
replay(stateDirectory);
|
replay(stateDirectory);
|
||||||
|
|
||||||
|
@ -2144,6 +2163,7 @@ public class TaskManagerTest {
|
||||||
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
|
final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 15L));
|
||||||
|
|
||||||
expectLockObtainedFor(taskId00);
|
expectLockObtainedFor(taskId00);
|
||||||
|
expectDirectoryNotEmpty(taskId00);
|
||||||
makeTaskFolders(taskId00.toString());
|
makeTaskFolders(taskId00.toString());
|
||||||
replay(stateDirectory);
|
replay(stateDirectory);
|
||||||
|
|
||||||
|
@ -2243,6 +2263,7 @@ public class TaskManagerTest {
|
||||||
public void shouldNotReportOffsetSumsAndReleaseLockForUnassignedTaskWithoutCheckpoint() throws Exception {
|
public void shouldNotReportOffsetSumsAndReleaseLockForUnassignedTaskWithoutCheckpoint() throws Exception {
|
||||||
expectLockObtainedFor(taskId00);
|
expectLockObtainedFor(taskId00);
|
||||||
makeTaskFolders(taskId00.toString());
|
makeTaskFolders(taskId00.toString());
|
||||||
|
expectDirectoryNotEmpty(taskId00);
|
||||||
expect(stateDirectory.checkpointFileFor(taskId00)).andReturn(getCheckpointFile(taskId00));
|
expect(stateDirectory.checkpointFileFor(taskId00)).andReturn(getCheckpointFile(taskId00));
|
||||||
replay(stateDirectory);
|
replay(stateDirectory);
|
||||||
taskManager.handleRebalanceStart(singleton("topic"));
|
taskManager.handleRebalanceStart(singleton("topic"));
|
||||||
|
@ -2350,6 +2371,7 @@ public class TaskManagerTest {
|
||||||
|
|
||||||
makeTaskFolders(taskId00.toString(), taskId01.toString());
|
makeTaskFolders(taskId00.toString(), taskId01.toString());
|
||||||
expectLockObtainedFor(taskId00, taskId01);
|
expectLockObtainedFor(taskId00, taskId01);
|
||||||
|
expectDirectoryNotEmpty(taskId00, taskId01);
|
||||||
|
|
||||||
// The second attempt will return empty tasks.
|
// The second attempt will return empty tasks.
|
||||||
makeTaskFolders();
|
makeTaskFolders();
|
||||||
|
@ -4050,7 +4072,8 @@ public class TaskManagerTest {
|
||||||
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
|
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
|
||||||
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
|
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
|
||||||
|
|
||||||
makeTaskFolders(taskId00.toString(), task01.toString());
|
makeTaskFolders(taskId00.toString(), taskId01.toString());
|
||||||
|
expectDirectoryNotEmpty(taskId00, taskId01);
|
||||||
expectLockObtainedFor(taskId00, taskId01);
|
expectLockObtainedFor(taskId00, taskId01);
|
||||||
expectRestoreToBeCompleted(consumer);
|
expectRestoreToBeCompleted(consumer);
|
||||||
when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
|
when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
|
||||||
|
@ -4829,6 +4852,12 @@ public class TaskManagerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void expectDirectoryNotEmpty(final TaskId... tasks) {
|
||||||
|
for (final TaskId taskId : tasks) {
|
||||||
|
expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void expectConsumerAssignmentPaused(final Consumer<byte[], byte[]> consumer) {
|
private static void expectConsumerAssignmentPaused(final Consumer<byte[], byte[]> consumer) {
|
||||||
final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
|
final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
|
||||||
expect(consumer.assignment()).andReturn(assignment);
|
expect(consumer.assignment()).andReturn(assignment);
|
||||||
|
@ -5119,6 +5148,7 @@ public class TaskManagerTest {
|
||||||
Files.createFile(checkpointFile.toPath());
|
Files.createFile(checkpointFile.toPath());
|
||||||
new OffsetCheckpoint(checkpointFile).write(offsets);
|
new OffsetCheckpoint(checkpointFile).write(offsets);
|
||||||
expect(stateDirectory.checkpointFileFor(task)).andReturn(checkpointFile);
|
expect(stateDirectory.checkpointFileFor(task)).andReturn(checkpointFile);
|
||||||
|
expectDirectoryNotEmpty(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
private File getCheckpointFile(final TaskId task) {
|
private File getCheckpointFile(final TaskId task) {
|
||||||
|
|
Loading…
Reference in New Issue