KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (#9373)

Avoid continuous repeated logging by not trying to clean empty task directories, which are longer fully deleted during internal cleanup as of https://issues.apache.org/jira/browse/KAFKA-6647.

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Michael Bingham 2020-10-07 16:48:35 -06:00 committed by GitHub
parent 65c29a9dec
commit 250c71b532
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 1 deletions

View File

@ -315,7 +315,7 @@ public class StateDirectory {
}
private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
for (final File taskDir : listAllTaskDirectories()) {
for (final File taskDir : listNonEmptyTaskDirectories()) {
final String dirName = taskDir.getName();
final TaskId id = TaskId.parse(dirName);
if (!locks.containsKey(id)) {

View File

@ -50,9 +50,11 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -286,6 +288,7 @@ public class StateDirectoryTest {
}
}
@Test
public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() {
final File dir = directory.directoryForTask(new TaskId(2, 0));
@ -304,6 +307,36 @@ public class StateDirectoryTest {
assertEquals(0, directory.listNonEmptyTaskDirectories().length);
}
@Test
public void shouldCleanupObsoleteStateDirectoriesOnlyOnce() {
final File dir = directory.directoryForTask(new TaskId(2, 0));
assertTrue(new File(dir, "store").mkdir());
assertEquals(1, directory.listAllTaskDirectories().length);
assertEquals(1, directory.listNonEmptyTaskDirectories().length);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
directory.cleanRemovedTasks(0);
assertTrue(dir.exists());
assertEquals(1, directory.listAllTaskDirectories().length);
assertEquals(0, directory.listNonEmptyTaskDirectories().length);
assertThat(
appender.getMessages(),
hasItem(containsString("Deleting obsolete state directory"))
);
}
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
directory.cleanRemovedTasks(0);
assertTrue(dir.exists());
assertEquals(1, directory.listAllTaskDirectories().length);
assertEquals(0, directory.listNonEmptyTaskDirectories().length);
assertThat(
appender.getMessages(),
not(hasItem(containsString("Deleting obsolete state directory")))
);
}
}
@Test
public void shouldNotRemoveNonTaskDirectoriesAndFiles() {
final File otherDir = TestUtils.tempDirectory(stateDir.toPath(), "foo");