KAFKA-6710: Remove Thread.sleep from LogManager.deleteLogs (#4771)

`Thread.sleep` in `LogManager.deleteLogs` potentially blocks a scheduler thread for up to `log.segment.delete.delay.ms` with a default value of a minute. To avoid this, `deleteLogs` now deletes the logs for which `currentDefaultConfig.fileDeleteDelayMs` has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed are considered for deletion in the next iteration of `deleteLogs`, which is scheduled sooner if required.

Reviewers: Jun Rao <junrao@gmail.com>, Dong Lin <lindong28@gmail.com>, Ted Yu <yuzhihong@gmail.com>
This commit is contained in:
Rajini Sivaram 2018-03-24 20:54:09 +00:00 committed by GitHub
parent 514936af6f
commit f66aebff36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 9 deletions

View File

@ -79,13 +79,15 @@ class LogManager(logDirs: Seq[File],
private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()
private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
@volatile var currentDefaultConfig = initialDefaultConfig
@volatile private var _currentDefaultConfig = initialDefaultConfig
@volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir
def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = {
this.currentDefaultConfig = logConfig
this._currentDefaultConfig = logConfig
}
def currentDefaultConfig: LogConfig = _currentDefaultConfig
def liveLogDirs: Seq[File] = {
if (_liveLogDirs.size == logDirs.size)
logDirs
@ -245,6 +247,9 @@ class LogManager(logDirs: Seq[File],
this.logsToBeDeleted.add((log, time.milliseconds()))
}
// Only for testing
private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty
private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = {
debug("Loading log '" + logDir.getName + "'")
val topicPartition = Log.parseTopicPartitionName(logDir)
@ -704,17 +709,27 @@ class LogManager(logDirs: Seq[File],
}
/**
* Delete logs marked for deletion.
* Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs`
* has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be
* considered for deletion in the next iteration of `deleteLogs`. The next iteration will be executed
* after the remaining time for the first log that is not deleted. If there are no more `logsToBeDeleted`,
* `deleteLogs` will be executed after `currentDefaultConfig.fileDeleteDelayMs`.
*/
private def deleteLogs(): Unit = {
var nextDelayMs = 0L
try {
while (!logsToBeDeleted.isEmpty) {
val (removedLog, scheduleTimeMs) = logsToBeDeleted.take()
def nextDeleteDelayMs: Long = {
if (!logsToBeDeleted.isEmpty) {
val (_, scheduleTimeMs) = logsToBeDeleted.peek()
scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
} else
currentDefaultConfig.fileDeleteDelayMs
}
while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
val (removedLog, _) = logsToBeDeleted.take()
if (removedLog != null) {
try {
val waitingTimeMs = scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
if (waitingTimeMs > 0)
Thread.sleep(waitingTimeMs)
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
@ -730,7 +745,7 @@ class LogManager(logDirs: Seq[File],
try {
scheduler.schedule("kafka-delete-logs",
deleteLogs _,
delay = currentDefaultConfig.fileDeleteDelayMs,
delay = nextDelayMs,
unit = TimeUnit.MILLISECONDS)
} catch {
case e: Throwable =>

View File

@ -332,5 +332,10 @@ class LogManagerTest {
assertNotEquals("File reference was not updated in index", fileBeforeDelete.getAbsolutePath,
fileInIndex.get.getAbsolutePath)
}
time.sleep(logManager.InitialTaskDelayMs)
assertTrue("Logs deleted too early", logManager.hasLogsToBeDeleted)
time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.InitialTaskDelayMs)
assertFalse("Logs not deleted", logManager.hasLogsToBeDeleted)
}
}