From 54f49314afbc1e13ff511f152e9b292ffaf9104d Mon Sep 17 00:00:00 2001 From: Edward Jay Kreps Date: Sat, 17 Sep 2011 03:21:06 +0000 Subject: [PATCH] KAFKA-70 Patch from Prashanth Menon to add space-based retention setting. git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1171886 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/scala/kafka/log/LogManager.scala | 60 +++++++++++++++---- .../main/scala/kafka/server/KafkaConfig.scala | 3 + .../scala/unit/kafka/log/LogManagerTest.scala | 50 +++++++++++++++- 3 files changed, 99 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 80e4163f1ee..698a5f38f12 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -51,6 +51,7 @@ private[kafka] class LogManager(val config: KafkaConfig, private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false) private val logFlushIntervalMap = config.flushIntervalMap private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap) + private val logRetentionSize = config.logRetentionSize /* Initialize a log for each subdirectory of the main log directory */ private val logs = new Pool[String, Pool[Int, Log]]() @@ -193,6 +194,51 @@ private[kafka] class LogManager(val config: KafkaConfig, log } + /* Attemps to delete all provided segments from a log and returns how many it was able to */ + private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = { + var total = 0 + for(segment <- segments) { + logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name) + Utils.swallow(logger.warn, segment.messageSet.close()) + if(!segment.file.delete()) { + logger.warn("Delete failed.") + } else { + total += 1 + } + } + total + } + + /* Runs through the log removing segments older than a certain age */ + private def cleanupExpiredSegments(log: Log): Int = { + val startMs = time.milliseconds + val topic = Utils.getTopicPartition(log.dir.getName)._1 + val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs) + val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS) + val total = deleteSegments(log, toBeDeleted) + total + } + + /** + * Runs through the log removing segments until the size of the log + * is at least logRetentionSize bytes in size + */ + private def cleanupSegmentsToMaintainSize(log: Log): Int = { + if(logRetentionSize < 0 || log.size < logRetentionSize) return 0 + var diff = log.size - logRetentionSize + def shouldDelete(segment: LogSegment) = { + if(diff - segment.size >= 0) { + diff -= segment.size + true + } else { + false + } + } + val toBeDeleted = log.markDeletedWhile( shouldDelete ) + val total = deleteSegments(log, toBeDeleted) + total + } + /** * Delete any eligible logs. Return the number of segments deleted. */ @@ -204,19 +250,7 @@ private[kafka] class LogManager(val config: KafkaConfig, while(iter.hasNext) { val log = iter.next logger.debug("Garbage collecting '" + log.name + "'") - var logCleanupThresholdMS = this.logCleanupDefaultAgeMs - val topic = Utils.getTopicPartition(log.dir.getName)._1 - if (logRetentionMSMap.contains(topic)) - logCleanupThresholdMS = logRetentionMSMap(topic) - val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS) - for(segment <- toBeDeleted) { - logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name) - Utils.swallow(logger.warn, segment.messageSet.close()) - if(!segment.file.delete()) - logger.warn("Delete failed.") - else - total += 1 - } + total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } logger.debug("Log cleanup completed. " + total + " files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds") diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 08307d306af..8577f423aaf 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -64,6 +64,9 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) { /* the number of hours to keep a log file before deleting it */ val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue)) + /* the maximum size of the log before deleting it */ + val logRetentionSize = Utils.getInt(props, "log.retention.size", -1) + /* the number of hours to keep a log file before deleting it for some specific topic*/ val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", "")) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 3cc83e3a06b..e088c98f706 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -59,7 +59,7 @@ class LogManagerTest extends JUnitSuite { @Test - def testCleanup() { + def testCleanupExpiredSegments() { val log = logManager.getOrCreateLog("cleanup", 0) var offset = 0L for(i <- 0 until 1000) { @@ -86,6 +86,54 @@ class LogManagerTest extends JUnitSuite { log.append(TestUtils.singleMessageSet("test".getBytes())) } + @Test + def testCleanupSegmentsToMaintainSize() { + val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes + val retentionHours = 1 + val retentionMs = 1000 * 60 * 60 * retentionHours + val props = TestUtils.createBrokerConfig(0, -1) + logManager.close + Thread.sleep(100) + config = new KafkaConfig(props) { + override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages + override val enableZookeeper = false + override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over + override val logRetentionHours = retentionHours + } + logManager = new LogManager(config, null, time, -1, retentionMs, false) + logManager.startup + + // create a log + val log = logManager.getOrCreateLog("cleanup", 0) + var offset = 0L + + // add a bunch of messages that should be larger than the retentionSize + for(i <- 0 until 1000) { + val set = TestUtils.singleMessageSet("test".getBytes()) + log.append(set) + offset += set.sizeInBytes + } + // flush to make sure it's written to disk, then sleep to confirm + log.flush + Thread.sleep(2000) + + // should be exactly 100 full segments + 1 new empty one + assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments) + + // this cleanup shouldn't find any expired segments but should delete some to reduce size + logManager.cleanupLogs() + assertEquals("Now there should be exactly 7 segments", 6 + 1, log.numberOfSegments) + assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes) + try { + log.read(0, 1024) + fail("Should get exception from fetching earlier.") + } catch { + case e: OffsetOutOfRangeException => "This is good." + } + // log should still be appendable + log.append(TestUtils.singleMessageSet("test".getBytes())) + } + @Test def testTimeBasedFlush() { val props = TestUtils.createBrokerConfig(0, -1)