mirror of https://github.com/apache/kafka.git
KAFKA-15375: fix broken clean shutdown detection logic in LogManager
When running in kraft mode, LogManager.startup is called in a different thread than the main broker (#14239) startup thread (by BrokerMetadataPublisher when the first metadata update is received.) If a fatal error happens during broker startup, before LogManager.startup is completed, LogManager.shutdown may mark log dirs as clean shutdown improperly. This PR includes following change: 1. During LogManager startup time: - track hadCleanShutdwon info for each log dir - track loadLogsCompleted status for each log dir 2. During LogManager shutdown time: - do not write clean shutdown marker file for log dirs which have hadCleanShutdown==false and loadLogsCompleted==false Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
a4098bfd16
commit
8544dba721
|
@ -124,6 +124,12 @@ class LogManager(logDirs: Seq[File],
|
||||||
logDirsSet
|
logDirsSet
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A map that stores hadCleanShutdown flag for each log dir.
|
||||||
|
private val hadCleanShutdownFlags = new ConcurrentHashMap[String, Boolean]()
|
||||||
|
|
||||||
|
// A map that tells whether all logs in a log dir had been loaded or not at startup time.
|
||||||
|
private val loadLogsCompletedFlags = new ConcurrentHashMap[String, Boolean]()
|
||||||
|
|
||||||
@volatile private var _cleaner: LogCleaner = _
|
@volatile private var _cleaner: LogCleaner = _
|
||||||
private[kafka] def cleaner: LogCleaner = _cleaner
|
private[kafka] def cleaner: LogCleaner = _cleaner
|
||||||
|
|
||||||
|
@ -368,6 +374,7 @@ class LogManager(logDirs: Seq[File],
|
||||||
// log recovery itself is being performed by `Log` class during initialization
|
// log recovery itself is being performed by `Log` class during initialization
|
||||||
info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
|
info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
|
||||||
}
|
}
|
||||||
|
hadCleanShutdownFlags.put(logDirAbsolutePath, hadCleanShutdown)
|
||||||
|
|
||||||
var recoveryPoints = Map[TopicPartition, Long]()
|
var recoveryPoints = Map[TopicPartition, Long]()
|
||||||
try {
|
try {
|
||||||
|
@ -390,7 +397,8 @@ class LogManager(logDirs: Seq[File],
|
||||||
val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
|
val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
|
||||||
logDir.isDirectory && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
|
logDir.isDirectory && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
|
||||||
numTotalLogs += logsToLoad.length
|
numTotalLogs += logsToLoad.length
|
||||||
numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length)
|
numRemainingLogs.put(logDirAbsolutePath, logsToLoad.length)
|
||||||
|
loadLogsCompletedFlags.put(logDirAbsolutePath, logsToLoad.isEmpty)
|
||||||
|
|
||||||
val jobsForDir = logsToLoad.map { logDir =>
|
val jobsForDir = logsToLoad.map { logDir =>
|
||||||
val runnable: Runnable = () => {
|
val runnable: Runnable = () => {
|
||||||
|
@ -408,13 +416,18 @@ class LogManager(logDirs: Seq[File],
|
||||||
// And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here.
|
// And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here.
|
||||||
} finally {
|
} finally {
|
||||||
val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
|
val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
|
||||||
val remainingLogs = decNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath)
|
val remainingLogs = decNumRemainingLogs(numRemainingLogs, logDirAbsolutePath)
|
||||||
val currentNumLoaded = logsToLoad.length - remainingLogs
|
val currentNumLoaded = logsToLoad.length - remainingLogs
|
||||||
log match {
|
log match {
|
||||||
case Some(loadedLog) => info(s"Completed load of $loadedLog with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " +
|
case Some(loadedLog) => info(s"Completed load of $loadedLog with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " +
|
||||||
s"($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)")
|
s"($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)")
|
||||||
case None => info(s"Error while loading logs in $logDir in ${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)")
|
case None => info(s"Error while loading logs in $logDir in ${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in $logDirAbsolutePath)")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (remainingLogs == 0) {
|
||||||
|
// loadLog is completed for all logs under the logDdir, mark it.
|
||||||
|
loadLogsCompletedFlags.put(logDirAbsolutePath, true)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
runnable
|
runnable
|
||||||
|
@ -611,9 +624,15 @@ class LogManager(logDirs: Seq[File],
|
||||||
debug(s"Updating log start offsets at $dir")
|
debug(s"Updating log start offsets at $dir")
|
||||||
checkpointLogStartOffsetsInDir(dir, logs)
|
checkpointLogStartOffsetsInDir(dir, logs)
|
||||||
|
|
||||||
// mark that the shutdown was clean by creating marker file
|
// mark that the shutdown was clean by creating marker file for log dirs that:
|
||||||
debug(s"Writing clean shutdown marker at $dir")
|
// 1. had clean shutdown marker file; or
|
||||||
CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this)
|
// 2. had no clean shutdown marker file, but all logs under it have been recovered at startup time
|
||||||
|
val logDirAbsolutePath = dir.getAbsolutePath
|
||||||
|
if (hadCleanShutdownFlags.getOrDefault(logDirAbsolutePath, false) ||
|
||||||
|
loadLogsCompletedFlags.getOrDefault(logDirAbsolutePath, false)) {
|
||||||
|
debug(s"Writing clean shutdown marker at $dir")
|
||||||
|
CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -134,6 +134,59 @@ class LogManagerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test that LogManager.shutdown() doesn't create clean shutdown file for a log directory that has not completed
|
||||||
|
* recovery.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
def testCleanShutdownFileWhenShutdownCalledBeforeStartupComplete(): Unit = {
|
||||||
|
// 1. create two logs under logDir
|
||||||
|
val topicPartition0 = new TopicPartition(name, 0)
|
||||||
|
val topicPartition1 = new TopicPartition(name, 1)
|
||||||
|
val log0 = logManager.getOrCreateLog(topicPartition0, topicId = None)
|
||||||
|
val log1 = logManager.getOrCreateLog(topicPartition1, topicId = None)
|
||||||
|
val logFile0 = new File(logDir, name + "-0")
|
||||||
|
val logFile1 = new File(logDir, name + "-1")
|
||||||
|
assertTrue(logFile0.exists)
|
||||||
|
assertTrue(logFile1.exists)
|
||||||
|
|
||||||
|
log0.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
|
||||||
|
log0.takeProducerSnapshot()
|
||||||
|
|
||||||
|
log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
|
||||||
|
log1.takeProducerSnapshot()
|
||||||
|
|
||||||
|
// 2. simulate unclean shutdown by deleting clean shutdown marker file
|
||||||
|
logManager.shutdown()
|
||||||
|
assertTrue(Files.deleteIfExists(new File(logDir, LogLoader.CleanShutdownFile).toPath))
|
||||||
|
|
||||||
|
// 3. create a new LogManager and start it in a different thread
|
||||||
|
@volatile var loadLogCalled = 0
|
||||||
|
logManager = spy(createLogManager())
|
||||||
|
doAnswer { invocation =>
|
||||||
|
// intercept LogManager.loadLog to sleep 5 seconds so that there is enough time to call LogManager.shutdown
|
||||||
|
// before LogManager.startup completes.
|
||||||
|
Thread.sleep(5000)
|
||||||
|
invocation.callRealMethod().asInstanceOf[UnifiedLog]
|
||||||
|
loadLogCalled = loadLogCalled + 1
|
||||||
|
}.when(logManager).loadLog(any[File], any[Boolean], any[Map[TopicPartition, Long]], any[Map[TopicPartition, Long]],
|
||||||
|
any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Int]])
|
||||||
|
|
||||||
|
val t = new Thread() {
|
||||||
|
override def run(): Unit = { logManager.startup(Set.empty) }
|
||||||
|
}
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
// 4. shutdown LogManager after the first log is loaded but before the second log is loaded
|
||||||
|
TestUtils.waitUntilTrue(() => loadLogCalled == 1,
|
||||||
|
"Timed out waiting for only the first log to be loaded")
|
||||||
|
logManager.shutdown()
|
||||||
|
logManager = null
|
||||||
|
|
||||||
|
// 5. verify that CleanShutdownFile is not created under logDir
|
||||||
|
assertFalse(Files.exists(new File(logDir, LogLoader.CleanShutdownFile).toPath))
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
|
* Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
|
||||||
* The LogManager is configured with one invalid log directory which should be marked as offline.
|
* The LogManager is configured with one invalid log directory which should be marked as offline.
|
||||||
|
|
Loading…
Reference in New Issue