diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index f3603b2a316..d8e3f41c20b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1249,9 +1249,15 @@ public class TaskManager { try { final TaskId id = parseTaskDirectoryName(dir.getName(), namedTopology); if (stateDirectory.lock(id)) { - lockedTaskDirectories.add(id); - if (!allTasks.containsKey(id)) { - log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id); + // Check again in case the cleaner thread ran and emptied the directory + if (stateDirectory.directoryForTaskIsEmpty(id)) { + log.debug("Releasing lock on empty directory for task {}", id); + stateDirectory.unlock(id); + } else { + lockedTaskDirectories.add(id); + if (!allTasks.containsKey(id)) { + log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id); + } } } } catch (final TaskIdFormatException e) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 21f5f9561d9..61e078868bf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -1905,6 +1905,7 @@ public class TaskManagerTest { public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception { expectLockObtainedFor(taskId01); expectLockFailedFor(taskId10); + expectDirectoryNotEmpty(taskId01); makeTaskFolders( taskId01.toString(), @@ -1918,6 +1919,21 @@ public class TaskManagerTest { assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); } + @Test + public void shouldUnlockEmptyDirsAtRebalanceStart() throws Exception { + expectLockObtainedFor(taskId01, taskId10); + expectDirectoryNotEmpty(taskId01); + expect(stateDirectory.directoryForTaskIsEmpty(taskId10)).andReturn(true); + expectUnlockFor(taskId10); + + makeTaskFolders(taskId01.toString(), taskId10.toString()); + replay(stateDirectory); + taskManager.handleRebalanceStart(singleton("topic")); + + verify(stateDirectory); + assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); + } + @Test public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() { final Set assigned = mkSet(t1p0, t1p1); @@ -1951,6 +1967,7 @@ public class TaskManagerTest { @Test public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception { expectLockObtainedFor(taskId00, taskId01, taskId02); + expectDirectoryNotEmpty(taskId00, taskId01, taskId02); expectUnlockFor(taskId02); makeTaskFolders( @@ -1993,6 +2010,7 @@ public class TaskManagerTest { when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, restoringStatefulTask)); when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask)); expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03); + expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03); expectUnlockFor(taskId03); makeTaskFolders( taskId00.toString(), @@ -2121,6 +2139,7 @@ public class TaskManagerTest { private void computeOffsetSumAndVerify(final Map changelogOffsets, final Map expectedOffsetSums) throws Exception { expectLockObtainedFor(taskId00); + expectDirectoryNotEmpty(taskId00); makeTaskFolders(taskId00.toString()); replay(stateDirectory); @@ -2144,6 +2163,7 @@ public class TaskManagerTest { final Map expectedOffsetSums = mkMap(mkEntry(taskId00, 15L)); expectLockObtainedFor(taskId00); + expectDirectoryNotEmpty(taskId00); makeTaskFolders(taskId00.toString()); replay(stateDirectory); @@ -2243,6 +2263,7 @@ public class TaskManagerTest { public void shouldNotReportOffsetSumsAndReleaseLockForUnassignedTaskWithoutCheckpoint() throws Exception { expectLockObtainedFor(taskId00); makeTaskFolders(taskId00.toString()); + expectDirectoryNotEmpty(taskId00); expect(stateDirectory.checkpointFileFor(taskId00)).andReturn(getCheckpointFile(taskId00)); replay(stateDirectory); taskManager.handleRebalanceStart(singleton("topic")); @@ -2350,6 +2371,7 @@ public class TaskManagerTest { makeTaskFolders(taskId00.toString(), taskId01.toString()); expectLockObtainedFor(taskId00, taskId01); + expectDirectoryNotEmpty(taskId00, taskId01); // The second attempt will return empty tasks. makeTaskFolders(); @@ -4050,7 +4072,8 @@ public class TaskManagerTest { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager); - makeTaskFolders(taskId00.toString(), task01.toString()); + makeTaskFolders(taskId00.toString(), taskId01.toString()); + expectDirectoryNotEmpty(taskId00, taskId01); expectLockObtainedFor(taskId00, taskId01); expectRestoreToBeCompleted(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))) @@ -4829,6 +4852,12 @@ public class TaskManagerTest { } } + private void expectDirectoryNotEmpty(final TaskId... tasks) { + for (final TaskId taskId : tasks) { + expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(false); + } + } + private static void expectConsumerAssignmentPaused(final Consumer consumer) { final Set assignment = singleton(new TopicPartition("assignment", 0)); expect(consumer.assignment()).andReturn(assignment); @@ -5119,6 +5148,7 @@ public class TaskManagerTest { Files.createFile(checkpointFile.toPath()); new OffsetCheckpoint(checkpointFile).write(offsets); expect(stateDirectory.checkpointFileFor(task)).andReturn(checkpointFile); + expectDirectoryNotEmpty(task); } private File getCheckpointFile(final TaskId task) {