KAFKA-15391: Handle concurrent dir rename which makes log-dir to be offline unexpectedly (#14280)

A race condition between async flush and segment rename (for deletion purpose) might cause the entire log directory to be marked offline when we delete a topic. This PR fixes the bug by ignoring NoSuchFileException when we flush a directory.

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Okada Haruki 2023-08-24 17:24:01 +09:00 committed by Divij Vaidya
parent 714b4ebeeb
commit fffbab7951
3 changed files with 34 additions and 2 deletions

View File

@ -976,6 +976,19 @@ public final class Utils {
}
}
/**
* Flushes dirty directories to guarantee crash consistency with swallowing {@link NoSuchFileException}
*
* @throws IOException if flushing the directory fails.
*/
public static void flushDirIfExists(Path path) throws IOException {
try {
flushDir(path);
} catch (NoSuchFileException e) {
log.warn("Failed to flush directory {}", path);
}
}
/**
* Closes all the provided closeables.
* @throws IOException if any of the close methods throws an IOException.

View File

@ -173,8 +173,11 @@ class LocalLog(@volatile private var _dir: File,
val segmentsToFlush = segments.values(currentRecoveryPoint, offset)
segmentsToFlush.foreach(_.flush())
// If there are any new segments, we need to flush the parent directory for crash consistency.
if (segmentsToFlush.exists(_.baseOffset >= currentRecoveryPoint))
Utils.flushDir(dir.toPath)
if (segmentsToFlush.exists(_.baseOffset >= currentRecoveryPoint)) {
// The directory might be renamed concurrently for topic deletion, which may cause NoSuchFileException here.
// Since the directory is to be deleted anyways, we just swallow NoSuchFileException and let it go.
Utils.flushDirIfExists(dir.toPath)
}
}
}

View File

@ -31,7 +31,9 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.Mockito.{doReturn, spy}
import scala.jdk.CollectionConverters._
@ -701,6 +703,20 @@ class LocalLogTest {
assertThrows(classOf[KafkaException], () => log.roll())
}
@Test
def testFlushingNonExistentDir(): Unit = {
val spyLog = spy(log)
val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
appendRecords(List(record))
mockTime.sleep(1)
val newSegment = log.roll()
// simulate the directory is renamed concurrently
doReturn(new File("__NON_EXISTENT__"), Nil: _*).when(spyLog).dir
assertDoesNotThrow((() => spyLog.flush(newSegment.baseOffset)): Executable)
}
private def createLocalLogWithActiveSegment(dir: File = logDir,
config: LogConfig,
segments: LogSegments = new LogSegments(topicPartition),