diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 7aa5bcd88d8..f26a84c6244 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -79,13 +79,15 @@ class LogManager(logDirs: Seq[File], private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]() private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs) - @volatile var currentDefaultConfig = initialDefaultConfig + @volatile private var _currentDefaultConfig = initialDefaultConfig @volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir def reconfigureDefaultLogConfig(logConfig: LogConfig): Unit = { - this.currentDefaultConfig = logConfig + this._currentDefaultConfig = logConfig } + def currentDefaultConfig: LogConfig = _currentDefaultConfig + def liveLogDirs: Seq[File] = { if (_liveLogDirs.size == logDirs.size) logDirs @@ -245,6 +247,9 @@ class LogManager(logDirs: Seq[File], this.logsToBeDeleted.add((log, time.milliseconds())) } + // Only for testing + private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty + private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = { debug("Loading log '" + logDir.getName + "'") val topicPartition = Log.parseTopicPartitionName(logDir) @@ -704,17 +709,27 @@ class LogManager(logDirs: Seq[File], } /** - * Delete logs marked for deletion. + * Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs` + * has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be + * considered for deletion in the next iteration of `deleteLogs`. The next iteration will be executed + * after the remaining time for the first log that is not deleted. If there are no more `logsToBeDeleted`, + * `deleteLogs` will be executed after `currentDefaultConfig.fileDeleteDelayMs`. */ private def deleteLogs(): Unit = { + var nextDelayMs = 0L try { - while (!logsToBeDeleted.isEmpty) { - val (removedLog, scheduleTimeMs) = logsToBeDeleted.take() + def nextDeleteDelayMs: Long = { + if (!logsToBeDeleted.isEmpty) { + val (_, scheduleTimeMs) = logsToBeDeleted.peek() + scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds() + } else + currentDefaultConfig.fileDeleteDelayMs + } + + while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) { + val (removedLog, _) = logsToBeDeleted.take() if (removedLog != null) { try { - val waitingTimeMs = scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds() - if (waitingTimeMs > 0) - Thread.sleep(waitingTimeMs) removedLog.delete() info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.") } catch { @@ -730,7 +745,7 @@ class LogManager(logDirs: Seq[File], try { scheduler.schedule("kafka-delete-logs", deleteLogs _, - delay = currentDefaultConfig.fileDeleteDelayMs, + delay = nextDelayMs, unit = TimeUnit.MILLISECONDS) } catch { case e: Throwable => diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 2fdda6b8827..d9efc236780 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -332,5 +332,10 @@ class LogManagerTest { assertNotEquals("File reference was not updated in index", fileBeforeDelete.getAbsolutePath, fileInIndex.get.getAbsolutePath) } + + time.sleep(logManager.InitialTaskDelayMs) + assertTrue("Logs deleted too early", logManager.hasLogsToBeDeleted) + time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.InitialTaskDelayMs) + assertFalse("Logs not deleted", logManager.hasLogsToBeDeleted) } }