KAFKA-70 Patch from Prashanth Menon to add space-based retention setting.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1171886 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Edward Jay Kreps 2011-09-17 03:21:06 +00:00
parent 6af334d69a
commit 54f49314af
3 changed files with 99 additions and 14 deletions

View File

@ -51,6 +51,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false) private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
private val logFlushIntervalMap = config.flushIntervalMap private val logFlushIntervalMap = config.flushIntervalMap
private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap) private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
private val logRetentionSize = config.logRetentionSize
/* Initialize a log for each subdirectory of the main log directory */ /* Initialize a log for each subdirectory of the main log directory */
private val logs = new Pool[String, Pool[Int, Log]]() private val logs = new Pool[String, Pool[Int, Log]]()
@ -193,6 +194,51 @@ private[kafka] class LogManager(val config: KafkaConfig,
log log
} }
/* Attemps to delete all provided segments from a log and returns how many it was able to */
private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
var total = 0
for(segment <- segments) {
logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name)
Utils.swallow(logger.warn, segment.messageSet.close())
if(!segment.file.delete()) {
logger.warn("Delete failed.")
} else {
total += 1
}
}
total
}
/* Runs through the log removing segments older than a certain age */
private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
val topic = Utils.getTopicPartition(log.dir.getName)._1
val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
val total = deleteSegments(log, toBeDeleted)
total
}
/**
* Runs through the log removing segments until the size of the log
* is at least logRetentionSize bytes in size
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
if(logRetentionSize < 0 || log.size < logRetentionSize) return 0
var diff = log.size - logRetentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}
val toBeDeleted = log.markDeletedWhile( shouldDelete )
val total = deleteSegments(log, toBeDeleted)
total
}
/** /**
* Delete any eligible logs. Return the number of segments deleted. * Delete any eligible logs. Return the number of segments deleted.
*/ */
@ -204,19 +250,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
while(iter.hasNext) { while(iter.hasNext) {
val log = iter.next val log = iter.next
logger.debug("Garbage collecting '" + log.name + "'") logger.debug("Garbage collecting '" + log.name + "'")
var logCleanupThresholdMS = this.logCleanupDefaultAgeMs total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
val topic = Utils.getTopicPartition(log.dir.getName)._1
if (logRetentionMSMap.contains(topic))
logCleanupThresholdMS = logRetentionMSMap(topic)
val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
for(segment <- toBeDeleted) {
logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name)
Utils.swallow(logger.warn, segment.messageSet.close())
if(!segment.file.delete())
logger.warn("Delete failed.")
else
total += 1
}
} }
logger.debug("Log cleanup completed. " + total + " files deleted in " + logger.debug("Log cleanup completed. " + total + " files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds") (time.milliseconds - startMs) / 1000 + " seconds")

View File

@ -64,6 +64,9 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
/* the number of hours to keep a log file before deleting it */ /* the number of hours to keep a log file before deleting it */
val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue)) val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
/* the maximum size of the log before deleting it */
val logRetentionSize = Utils.getInt(props, "log.retention.size", -1)
/* the number of hours to keep a log file before deleting it for some specific topic*/ /* the number of hours to keep a log file before deleting it for some specific topic*/
val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", "")) val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))

View File

@ -59,7 +59,7 @@ class LogManagerTest extends JUnitSuite {
@Test @Test
def testCleanup() { def testCleanupExpiredSegments() {
val log = logManager.getOrCreateLog("cleanup", 0) val log = logManager.getOrCreateLog("cleanup", 0)
var offset = 0L var offset = 0L
for(i <- 0 until 1000) { for(i <- 0 until 1000) {
@ -86,6 +86,54 @@ class LogManagerTest extends JUnitSuite {
log.append(TestUtils.singleMessageSet("test".getBytes())) log.append(TestUtils.singleMessageSet("test".getBytes()))
} }
@Test
def testCleanupSegmentsToMaintainSize() {
val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
val retentionHours = 1
val retentionMs = 1000 * 60 * 60 * retentionHours
val props = TestUtils.createBrokerConfig(0, -1)
logManager.close
Thread.sleep(100)
config = new KafkaConfig(props) {
override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
override val enableZookeeper = false
override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over
override val logRetentionHours = retentionHours
}
logManager = new LogManager(config, null, time, -1, retentionMs, false)
logManager.startup
// create a log
val log = logManager.getOrCreateLog("cleanup", 0)
var offset = 0L
// add a bunch of messages that should be larger than the retentionSize
for(i <- 0 until 1000) {
val set = TestUtils.singleMessageSet("test".getBytes())
log.append(set)
offset += set.sizeInBytes
}
// flush to make sure it's written to disk, then sleep to confirm
log.flush
Thread.sleep(2000)
// should be exactly 100 full segments + 1 new empty one
assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments)
// this cleanup shouldn't find any expired segments but should delete some to reduce size
logManager.cleanupLogs()
assertEquals("Now there should be exactly 7 segments", 6 + 1, log.numberOfSegments)
assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
try {
log.read(0, 1024)
fail("Should get exception from fetching earlier.")
} catch {
case e: OffsetOutOfRangeException => "This is good."
}
// log should still be appendable
log.append(TestUtils.singleMessageSet("test".getBytes()))
}
@Test @Test
def testTimeBasedFlush() { def testTimeBasedFlush() {
val props = TestUtils.createBrokerConfig(0, -1) val props = TestUtils.createBrokerConfig(0, -1)