KAFKA-1981; Make log compaction point configurable

Now uses LogSegment.largestTimestamp to determine age of segment's messages.

Author: Eric Wasserman <eric.wasserman@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1794 from ewasserman/feat-1981
This commit is contained in:
Eric Wasserman 2016-09-11 20:45:05 -05:00 committed by Jun Rao
parent 1933f12a53
commit 5f040cd77f
11 changed files with 521 additions and 96 deletions

View File

@ -36,7 +36,9 @@ import scala.collection._
* A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'.
*
* Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a
* "dirty" section that has not yet been cleaned. The active log segment is always excluded from cleaning.
* "dirty" section that has not yet been cleaned. The dirty section is further divided into the "cleanable" section followed by an "uncleanable" section.
* The uncleanable section is excluded from cleaning. The active log segment is always uncleanable. If there is a
* compaction lag time set, segments whose largest message timestamp is within the compaction lag time of the cleaning operation are also uncleanable.
*
* The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy
* and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log.
@ -227,7 +229,7 @@ class LogCleaner(val config: CleanerConfig,
* Clean a log if there is a dirty log available, otherwise sleep for a bit
*/
private def cleanOrSleep() {
val cleaned = cleanerManager.grabFilthiestCompactedLog() match {
val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
case None =>
false
case Some(cleanable) =>
@ -339,7 +341,7 @@ private[log] class Cleaner(val id: Int,
// build the offset map
info("Building offset map for %s...".format(cleanable.log.name))
val upperBoundOffset = log.activeSegment.baseOffset
val upperBoundOffset = cleanable.firstUncleanableOffset
buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap)
val endOffset = offsetMap.latestOffset + 1
stats.indexDone()
@ -351,9 +353,13 @@ private[log] class Cleaner(val id: Int,
case None => 0L
case Some(seg) => seg.largestTimestamp - log.config.deleteRetentionMs
}
// determine the timestamp up to which the log will be cleaned
// this is the lower of the last active segment and the compaction lag
val cleanableHorizionMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.largestTimestamp).getOrElse(0L)
// group the segments and clean the groups
info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs)))
info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizionMs), new Date(deleteHorizonMs)))
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
cleanSegments(log, group, offsetMap, deleteHorizonMs)
@ -627,7 +633,7 @@ private[log] class Cleaner(val id: Int,
}
/**
* Build a map of key_hash => offset for the keys in the dirty portion of the log to use in cleaning.
* Build a map of key_hash => offset for the keys in the cleanable dirty portion of the log to use in cleaning.
* @param log The log to use
* @param start The offset at which dirty messages begin
* @param end The ending offset for the map that is being built
@ -638,7 +644,7 @@ private[log] class Cleaner(val id: Int,
val dirty = log.logSegments(start, end).toBuffer
info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
// Add all the dirty segments. We must take at least map.slots * load_factor,
// Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
// but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
var full = false
for (segment <- dirty if !full) {
@ -749,12 +755,14 @@ private case class CleanerStats(time: Time = SystemTime) {
}
/**
* Helper class for a log, its topic/partition, and the last clean position
* Helper class for a log, its topic/partition, the first cleanable position, and the first uncleanable dirty position
*/
private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long) extends Ordered[LogToClean] {
private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size).sum
val dirtyBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum
val cleanableRatio = dirtyBytes / totalBytes.toDouble
def totalBytes = cleanBytes + dirtyBytes
private[this] val firstUncleanableSegment = log.logSegments(uncleanableOffset, log.activeSegment.baseOffset).headOption.getOrElse(log.activeSegment)
val firstUncleanableOffset = firstUncleanableSegment.baseOffset
val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size).sum
val totalBytes = cleanBytes + cleanableBytes
val cleanableRatio = cleanableBytes / totalBytes.toDouble
override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
}

View File

@ -18,15 +18,17 @@
package kafka.log
import java.io.File
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import kafka.utils.{Logging, Pool}
import kafka.server.OffsetCheckpoint
import collection.mutable
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.CoreUtils._
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.OffsetCheckpoint
import kafka.utils.CoreUtils._
import kafka.utils.{Logging, Pool, Time}
import scala.collection.{immutable, mutable}
private[log] sealed trait LogCleaningState
private[log] case object LogCleaningInProgress extends LogCleaningState
@ -43,6 +45,8 @@ private[log] case object LogCleaningPaused extends LogCleaningState
*/
private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
import LogCleanerManager._
override val loggerName = classOf[LogCleaner].getName
// package-private for testing
@ -67,39 +71,27 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
/**
* @return the position processed for all logs.
*/
def allCleanerCheckpoints(): Map[TopicAndPartition, Long] =
def allCleanerCheckpoints: Map[TopicAndPartition, Long] =
checkpoints.values.flatMap(_.read()).toMap
/**
* Choose the log to clean next and add it to the in-progress set. We recompute this
* every time off the full set of logs to allow logs to be dynamically added to the pool of logs
* each time from the full set of logs to allow logs to be dynamically added to the pool of logs
* the log manager maintains.
*/
def grabFilthiestCompactedLog(): Option[LogToClean] = {
def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
inLock(lock) {
val lastClean = allCleanerCheckpoints()
val now = time.milliseconds
val lastClean = allCleanerCheckpoints
val dirtyLogs = logs.filter {
case (_, log) => log.config.compact // match logs that are marked as compacted
}.filterNot {
case (topicAndPartition, _) => inProgress.contains(topicAndPartition) // skip any logs already in-progress
}.map {
case (topicAndPartition, log) => // create a LogToClean instance for each
// if the log segments are abnormally truncated and hence the checkpointed offset
// is no longer valid, reset to the log starting offset and log the error event
val logStartOffset = log.logSegments.head.baseOffset
val firstDirtyOffset = {
val offset = lastClean.getOrElse(topicAndPartition, logStartOffset)
if (offset < logStartOffset) {
// don't bother with the warning if compact and delete are enabled.
if (!isCompactAndDelete(log))
warn("Resetting first dirty offset for %s to log start offset %d since the checkpointed offset %d is invalid."
.format(topicAndPartition, logStartOffset, offset))
logStartOffset
} else {
offset
}
}
LogToClean(topicAndPartition, log, firstDirtyOffset)
val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicAndPartition,
lastClean, now)
LogToClean(topicAndPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
@ -131,10 +123,6 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
}
def isCompactAndDelete(log: Log): Boolean = {
log.config.compact && log.config.delete
}
/**
* Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
* the partition is aborted.
@ -145,7 +133,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
abortAndPauseCleaning(topicAndPartition)
resumeCleaning(topicAndPartition)
}
info("The cleaning for partition %s is aborted".format(topicAndPartition))
info(s"The cleaning for partition $topicAndPartition is aborted")
}
/**
@ -168,14 +156,13 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
case LogCleaningInProgress =>
inProgress.put(topicAndPartition, LogCleaningAborted)
case s =>
throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state."
.format(topicAndPartition, s))
throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be aborted and paused since it is in $s state.")
}
}
while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
}
info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
info(s"The cleaning for partition $topicAndPartition is aborted and paused")
}
/**
@ -185,19 +172,17 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
inLock(lock) {
inProgress.get(topicAndPartition) match {
case None =>
throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused."
.format(topicAndPartition))
throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is not paused.")
case Some(state) =>
state match {
case LogCleaningPaused =>
inProgress.remove(topicAndPartition)
case s =>
throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state."
.format(topicAndPartition, s))
throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is in $s state.")
}
}
}
info("Compaction for partition %s is resumed".format(topicAndPartition))
info(s"Compaction for partition $topicAndPartition is resumed")
}
/**
@ -257,7 +242,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
inProgress.put(topicAndPartition, LogCleaningPaused)
pausedCleaningCond.signalAll()
case s =>
throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s))
throw new IllegalStateException(s"In-progress partition $topicAndPartition cannot be in $s state.")
}
}
}
@ -268,3 +253,68 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
}
}
}
private[log] object LogCleanerManager extends Logging {
def isCompactAndDelete(log: Log): Boolean = {
log.config.compact && log.config.delete
}
/**
* Returns the range of dirty offsets that can be cleaned.
*
* @param log the log
* @param lastClean the map of checkpointed offsets
* @param now the current time in milliseconds of the cleaning operation
* @return the lower (inclusive) and upper (exclusive) offsets
*/
def cleanableOffsets(log: Log, topicAndPartition: TopicAndPartition, lastClean: immutable.Map[TopicAndPartition, Long], now: Long): (Long, Long) = {
// the checkpointed offset, ie., the first offset of the next dirty segment
val lastCleanOffset: Option[Long] = lastClean.get(topicAndPartition)
// If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
// reset to the log starting offset and log the error
val logStartOffset = log.logSegments.head.baseOffset
val firstDirtyOffset = {
val offset = lastCleanOffset.getOrElse(logStartOffset)
if (offset < logStartOffset) {
// don't bother with the warning if compact and delete are enabled.
if (!isCompactAndDelete(log))
warn(s"Resetting first dirty offset to log start offset $logStartOffset since the checkpointed offset $offset is invalid.")
logStartOffset
} else {
offset
}
}
// dirty log segments
val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset).toArray
val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
// find first segment that cannot be cleaned
// neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time
// may be cleaned
val firstUncleanableDirtyOffset: Long = Seq (
// the active segment is always uncleanable
Option(log.activeSegment.baseOffset),
// the first segment whose largest message timestamp is within a minimum time lag from now
if (compactionLagMs > 0) {
dirtyNonActiveSegments.find {
s =>
val isUncleanable = s.largestTimestamp > now - compactionLagMs
debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")
isUncleanable
} map(_.baseOffset)
} else None
).flatten.min
debug(s"Finding range of cleanable offsets for log=${log.name} topicAndPartition=$topicAndPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}")
(firstDirtyOffset, firstUncleanableDirtyOffset)
}
}

View File

@ -45,6 +45,7 @@ object Defaults {
val IndexInterval = kafka.server.Defaults.LogIndexIntervalBytes
val FileDeleteDelayMs = kafka.server.Defaults.LogDeleteDelayMs
val DeleteRetentionMs = kafka.server.Defaults.LogCleanerDeleteRetentionMs
val MinCompactionLagMs = kafka.server.Defaults.LogCleanerMinCompactionLagMs
val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio
val Compact = kafka.server.Defaults.LogCleanupPolicy
val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable
@ -73,6 +74,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
val indexInterval = getInt(LogConfig.IndexIntervalBytesProp)
val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
val compactionLagMs = getLong(LogConfig.MinCompactionLagMsProp)
val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
val compact = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Compact)
val delete = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Delete)
@ -108,6 +110,7 @@ object LogConfig {
val MaxMessageBytesProp = "max.message.bytes"
val IndexIntervalBytesProp = "index.interval.bytes"
val DeleteRetentionMsProp = "delete.retention.ms"
val MinCompactionLagMsProp = "min.compaction.lag.ms"
val FileDeleteDelayMsProp = "file.delete.delay.ms"
val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
val CleanupPolicyProp = "cleanup.policy"
@ -162,6 +165,8 @@ object LogConfig {
"on the time in which a consumer must complete a read if they begin from offset 0 " +
"to ensure that they get a valid snapshot of the final stage (otherwise delete " +
"tombstones may be collected before they complete their scan)."
val MinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. " +
"Only applicable for logs that are being compacted."
val MinCleanableRatioDoc = "This configuration controls how frequently the log " +
"compactor will attempt to clean the log (assuming <a href=\"#compaction\">log " +
"compaction</a> is enabled). By default we will avoid cleaning a log where more than " +
@ -253,6 +258,8 @@ object LogConfig {
KafkaConfig.LogIndexIntervalBytesProp)
.define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, atLeast(0), MEDIUM,
DeleteRetentionMsDoc, KafkaConfig.LogCleanerDeleteRetentionMsProp)
.define(MinCompactionLagMsProp, LONG, Defaults.MinCompactionLagMs, atLeast(0), MEDIUM, MinCompactionLagMsDoc,
KafkaConfig.LogCleanerMinCompactionLagMsProp)
.define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc,
KafkaConfig.LogDeleteDelayMsProp)
.define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,

View File

@ -89,6 +89,7 @@ object Defaults {
val LogCleanerMinCleanRatio = 0.5d
val LogCleanerEnable = true
val LogCleanerDeleteRetentionMs = 24 * 60 * 60 * 1000L
val LogCleanerMinCompactionLagMs = 0L
val LogIndexSizeMaxBytes = 10 * 1024 * 1024
val LogIndexIntervalBytes = 4096
val LogFlushIntervalMessages = Long.MaxValue
@ -255,6 +256,7 @@ object KafkaConfig {
val LogCleanerMinCleanRatioProp = "log.cleaner.min.cleanable.ratio"
val LogCleanerEnableProp = "log.cleaner.enable"
val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
val LogCleanerMinCompactionLagMsProp = "log.cleaner.min.compaction.lag.ms"
val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes"
val LogIndexIntervalBytesProp = "log.index.interval.bytes"
val LogFlushIntervalMessagesProp = "log.flush.interval.messages"
@ -434,6 +436,7 @@ object KafkaConfig {
val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total log for a log to eligible for cleaning"
val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server? Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size."
val LogCleanerDeleteRetentionMsDoc = "How long are delete records retained?"
val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
val LogIndexIntervalBytesDoc = "The interval with which we add an entry to the offset index"
val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a log partition before messages are flushed to disk "
@ -633,6 +636,7 @@ object KafkaConfig {
.define(LogCleanerMinCleanRatioProp, DOUBLE, Defaults.LogCleanerMinCleanRatio, MEDIUM, LogCleanerMinCleanRatioDoc)
.define(LogCleanerEnableProp, BOOLEAN, Defaults.LogCleanerEnable, MEDIUM, LogCleanerEnableDoc)
.define(LogCleanerDeleteRetentionMsProp, LONG, Defaults.LogCleanerDeleteRetentionMs, MEDIUM, LogCleanerDeleteRetentionMsDoc)
.define(LogCleanerMinCompactionLagMsProp, LONG, Defaults.LogCleanerMinCompactionLagMs, MEDIUM, LogCleanerMinCompactionLagMsDoc)
.define(LogIndexSizeMaxBytesProp, INT, Defaults.LogIndexSizeMaxBytes, atLeast(4), MEDIUM, LogIndexSizeMaxBytesDoc)
.define(LogIndexIntervalBytesProp, INT, Defaults.LogIndexIntervalBytes, atLeast(0), MEDIUM, LogIndexIntervalBytesDoc)
.define(LogFlushIntervalMessagesProp, LONG, Defaults.LogFlushIntervalMessages, atLeast(1), HIGH, LogFlushIntervalMessagesDoc)
@ -833,6 +837,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
val logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
val logCleanerMinCompactionLagMs = getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
val logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)

View File

@ -68,6 +68,7 @@ object KafkaServer {
logProps.put(LogConfig.MaxMessageBytesProp, kafkaConfig.messageMaxBytes)
logProps.put(LogConfig.IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes)
logProps.put(LogConfig.DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs)
logProps.put(LogConfig.MinCompactionLagMsProp, kafkaConfig.logCleanerMinCompactionLagMs)
logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio)
logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy)

View File

@ -48,7 +48,7 @@ class CleanerTest extends JUnitSuite {
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
@After
def teardown() {
def teardown(): Unit = {
Utils.delete(tmpdir)
}
@ -56,7 +56,7 @@ class CleanerTest extends JUnitSuite {
* Test simple log cleaning
*/
@Test
def testCleanSegments() {
def testCleanSegments(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
@ -81,7 +81,7 @@ class CleanerTest extends JUnitSuite {
}
@Test
def testCleaningWithDeletes() {
def testCleaningWithDeletes(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
@ -101,14 +101,14 @@ class CleanerTest extends JUnitSuite {
while(log.numberOfSegments < 4)
log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
val keys = keysInLog(log).toSet
assertTrue("None of the keys we deleted should still exist.",
(0 until leo.toInt by 2).forall(!keys.contains(_)))
}
@Test
def testPartialSegmentClean() {
def testPartialSegmentClean(): Unit = {
// because loadFactor is 0.75, this means we can fit 2 messages in the map
var cleaner = makeCleaner(2)
val logProps = new Properties()
@ -125,22 +125,64 @@ class CleanerTest extends JUnitSuite {
log.roll()
// clean the log with only one message removed
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2))
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset))
assertEquals(immutable.List(1,0,1,0), keysInLog(log))
assertEquals(immutable.List(1,2,3,4), offsetsInLog(log))
// continue to make progress, even though we can only clean one message at a time
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3))
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3, log.activeSegment.baseOffset))
assertEquals(immutable.List(0,1,0), keysInLog(log))
assertEquals(immutable.List(2,3,4), offsetsInLog(log))
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4))
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4, log.activeSegment.baseOffset))
assertEquals(immutable.List(1,0), keysInLog(log))
assertEquals(immutable.List(3,4), offsetsInLog(log))
}
@Test
def testLogToClean: Unit = {
def testCleaningWithUncleanableSection(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// Number of distinct keys. For an effective test this should be small enough such that each log segment contains some duplicates.
val N = 10
val numCleanableSegments = 2
val numTotalSegments = 7
// append messages with the keys 0 through N-1, values equal offset
while(log.numberOfSegments <= numCleanableSegments)
log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
// at this point one message past the cleanable segments has been added
// the entire segment containing the first uncleanable offset should not be cleaned.
val firstUncleanableOffset = log.logEndOffset + 1 // +1 so it is past the baseOffset
while(log.numberOfSegments < numTotalSegments - 1)
log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
// the last (active) segment has just one message
def distinctValuesBySegment = log.logSegments.map(s => s.log.map(m => TestUtils.readString(m.message.payload)).toSet.size).toSeq
val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.",
distinctValuesBySegment.reverse.tail.forall(_ > N))
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, firstUncleanableOffset))
val distinctValuesBySegmentAfterClean = distinctValuesBySegment
assertTrue("The cleanable segments should have fewer number of values after cleaning",
disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean).take(numCleanableSegments).forall { case (before, after) => after < before })
assertTrue("The uncleanable segments should have the same number of values after cleaning", disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
.slice(numCleanableSegments, numTotalSegments).forall { x => x._1 == x._2 })
}
@Test
def testLogToClean(): Unit = {
// create a log with small segment size
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
@ -151,14 +193,44 @@ class CleanerTest extends JUnitSuite {
for (i <- 0 until 6)
log.append(messageSet, assignOffsets = true)
val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset)
val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
assertEquals("Total bytes of LogToClean should equal size of all segments excluding the active segment",
logToClean.totalBytes, log.size - log.activeSegment.size)
}
@Test
def testCleaningWithUnkeyedMessages {
def testLogToCleanWithUncleanableSection(): Unit = {
// create a log with small segment size
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// create 6 segments with only one message in each segment
val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
for (i <- 0 until 6)
log.append(messageSet, assignOffsets = true)
// segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable
val segs = log.logSegments.toSeq
val logToClean = LogToClean(TopicAndPartition("test", 0), log, segs(2).baseOffset, segs(4).baseOffset)
val expectedCleanSize = segs.take(2).map(_.size).sum
val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum
val expectedUncleanableSize = segs.drop(4).map(_.size).sum
assertEquals("Uncleanable bytes of LogToClean should equal size of all segments prior the one containing first dirty",
logToClean.cleanBytes, expectedCleanSize)
assertEquals("Cleanable bytes of LogToClean should equal size of all segments from the one containing first dirty offset" +
" to the segment prior to the one with the first uncleanable offset",
logToClean.cleanableBytes, expectedCleanableSize)
assertEquals("Total bytes should be the sum of the clean and cleanable segments", logToClean.totalBytes, expectedCleanSize + expectedCleanableSize)
assertEquals("Total cleanable ratio should be the ratio of cleanable size to clean plus cleanable", logToClean.cleanableRatio,
expectedCleanableSize / (expectedCleanSize + expectedCleanableSize).toDouble, 1.0e-6d)
}
@Test
def testCleaningWithUnkeyedMessages(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
// create a log with compaction turned off so we can append unkeyed messages
@ -180,7 +252,7 @@ class CleanerTest extends JUnitSuite {
log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log))
assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size)
@ -198,7 +270,7 @@ class CleanerTest extends JUnitSuite {
def unkeyedMessageCountInLog(log: Log) =
log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum
def abortCheckDone(topicAndPartition: TopicAndPartition) {
def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = {
throw new LogCleaningAbortedException()
}
@ -206,7 +278,7 @@ class CleanerTest extends JUnitSuite {
* Test that abortion during cleaning throws a LogCleaningAbortedException
*/
@Test
def testCleanSegmentsWithAbort() {
def testCleanSegmentsWithAbort(): Unit = {
val cleaner = makeCleaner(Int.MaxValue, abortCheckDone)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
@ -229,7 +301,7 @@ class CleanerTest extends JUnitSuite {
* Validate the logic for grouping log segments together for cleaning
*/
@Test
def testSegmentGrouping() {
def testSegmentGrouping(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
@ -282,7 +354,7 @@ class CleanerTest extends JUnitSuite {
* stored in 4 bytes.
*/
@Test
def testSegmentGroupingWithSparseOffsets() {
def testSegmentGroupingWithSparseOffsets(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
@ -326,7 +398,7 @@ class CleanerTest extends JUnitSuite {
}
private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]) {
private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = {
val offsets = groups.flatMap(_.map(_.baseOffset))
assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets)
}
@ -335,7 +407,7 @@ class CleanerTest extends JUnitSuite {
* Test building an offset map off the log
*/
@Test
def testBuildOffsetMap() {
def testBuildOffsetMap(): Unit = {
val map = new FakeOffsetMap(1000)
val log = makeLog()
val cleaner = makeCleaner(Int.MaxValue)
@ -369,7 +441,7 @@ class CleanerTest extends JUnitSuite {
* </ol>
*/
@Test
def testRecoveryAfterCrash() {
def testRecoveryAfterCrash(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
@ -428,7 +500,6 @@ class CleanerTest extends JUnitSuite {
for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
}
System.out.println("here")
log = recoverAndCheck(config, cleanedKeys)
// add some more messages and clean the log again
@ -463,7 +534,7 @@ class CleanerTest extends JUnitSuite {
}
@Test
def testBuildOffsetMapFakeLarge() {
def testBuildOffsetMapFakeLarge(): Unit = {
val map = new FakeOffsetMap(1000)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
@ -488,7 +559,7 @@ class CleanerTest extends JUnitSuite {
* Test building a partial offset map of part of a log segment
*/
@Test
def testBuildPartialOffsetMap() {
def testBuildPartialOffsetMap(): Unit = {
// because loadFactor is 0.75, this means we can fit 2 messages in the map
val map = new FakeOffsetMap(3)
val log = makeLog()
@ -581,7 +652,7 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
-1L
}
def clear() = map.clear()
def clear(): Unit = map.clear()
def size: Int = map.size

View File

@ -92,7 +92,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
}
@Test
def testCleansCombinedCompactAndDeleteTopic() {
def testCleansCombinedCompactAndDeleteTopic(): Unit = {
val logProps = new Properties()
val retentionMs: Integer = 100000
logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
@ -144,7 +144,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
}
@Test
def testCleanerWithMessageFormatV0() {
def testCleanerWithMessageFormatV0(): Unit = {
val largeMessageKey = 20
val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Message.MagicValue_V0)
val maxMessageSize = codec match {
@ -232,7 +232,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
}
@After
def tearDown() {
def tearDown(): Unit = {
cleaner.shutdown()
time.scheduler.shutdown()
Utils.delete(logDir)

View File

@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.File
import java.util.Properties
import kafka.common.TopicAndPartition
import kafka.message._
import kafka.utils._
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import scala.collection._
/**
* This is an integration test that tests the fully integrated log cleaner
*/
@RunWith(value = classOf[Parameterized])
class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging {
val msPerHour = 60 * 60 * 1000
val compactionLag = 1 * msPerHour
assertTrue("compactionLag must be divisible by 2 for this test", compactionLag % 2 == 0)
val time = new MockTime(1400000000000L) // Tue May 13 16:53:20 UTC 2014
val cleanerBackOffMs = 200L
val segmentSize = 100
val deleteDelay = 1000
val logName = "log"
val logDir = TestUtils.tempDir()
var counter = 0
val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2))
val compressionCodec = CompressionCodec.getCompressionCodec(compressionCodecName)
@Test
def cleanerTest(): Unit = {
val cleaner = makeCleaner(parts = 3, backOffMs = cleanerBackOffMs)
val log = cleaner.logs.get(topics(0))
// t = T0
val T0 = time.milliseconds
val appends0 = writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T0)
val startSizeBlock0 = log.size
debug(s"total log size at T0: $startSizeBlock0")
val activeSegAtT0 = log.activeSegment
debug(s"active segment at T0 has base offset: ${activeSegAtT0.baseOffset}")
val sizeUpToActiveSegmentAtT0 = log.logSegments(0L, activeSegAtT0.baseOffset).map(_.size).sum
debug(s"log size up to base offset of active segment at T0: $sizeUpToActiveSegmentAtT0")
cleaner.startup()
// T0 < t < T1
// advance to a time still less than one compaction lag from start
time.sleep(compactionLag/2)
Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to _not_ clean
assertEquals("There should be no cleaning until the compaction lag has passed", startSizeBlock0, log.size)
// t = T1 > T0 + compactionLag
// advance to time a bit more than one compaction lag from start
time.sleep(compactionLag/2 + 1)
val T1 = time.milliseconds
// write another block of data
val appends1 = appends0 ++ writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T1)
val firstBlock1SegmentBaseOffset = activeSegAtT0.baseOffset
// the first block should get cleaned
cleaner.awaitCleaned("log", 0, activeSegAtT0.baseOffset)
// check the data is the same
val read1 = readFromLog(log)
assertEquals("Contents of the map shouldn't change.", appends1.toMap, read1.toMap)
val compactedSize = log.logSegments(0L, activeSegAtT0.baseOffset).map(_.size).sum
debug(s"after cleaning the compacted size up to active segment at T0: $compactedSize")
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get
assertTrue(s"log cleaner should have processed up to offset $firstBlock1SegmentBaseOffset, but lastCleaned=$lastCleaned", lastCleaned >= firstBlock1SegmentBaseOffset)
assertTrue(s"log should have been compacted: size up to offset of active segment at T0=$sizeUpToActiveSegmentAtT0 compacted size=$compactedSize",
sizeUpToActiveSegmentAtT0 > compactedSize)
cleaner.logs.remove(topics(0))
cleaner.shutdown()
}
private def readFromLog(log: Log): Iterable[(Int, Int)] = {
for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- {
// create single message iterator or deep iterator depending on compression codec
if (entry.message.compressionCodec == NoCompressionCodec)
Stream.cons(entry, Stream.empty).iterator
else
ByteBufferMessageSet.deepIterator(entry)
}) yield {
val key = TestUtils.readString(messageAndOffset.message.key).toInt
val value = TestUtils.readString(messageAndOffset.message.payload).toInt
key -> value
}
}
private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, timestamp: Long): Seq[(Int, Int)] = {
for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
val count = counter
val info = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
counter += 1
(key, count)
}
}
@After
def teardown(): Unit = {
time.scheduler.shutdown()
Utils.delete(logDir)
}
/* create a cleaner instance and logs with the given parameters */
private def makeCleaner(parts: Int,
minCleanableDirtyRatio: Float = 0.0F,
numThreads: Int = 1,
backOffMs: Long = 200L,
defaultPolicy: String = "compact",
policyOverrides: Map[String, String] = Map()): LogCleaner = {
// create partitions and add them to the pool
val logs = new Pool[TopicAndPartition, Log]()
for(i <- 0 until parts) {
val dir = new File(logDir, "log-" + i)
dir.mkdirs()
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
logProps.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
val log = new Log(dir = dir,
LogConfig(logProps),
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
logs.put(TopicAndPartition("log", i), log)
}
new LogCleaner(CleanerConfig(numThreads = numThreads, backOffMs = backOffMs),
logDirs = Array(logDir),
logs = logs,
time = time)
}
}
object LogCleanerLagIntegrationTest {
def oneParameter: java.util.Collection[Array[String]] = {
val l = new java.util.ArrayList[Array[String]]()
l.add(Array("NONE"))
l
}
@Parameters
def parameters: java.util.Collection[Array[String]] = {
val list = new java.util.ArrayList[Array[String]]()
for (codec <- CompressionType.values)
list.add(Array(codec.name))
list
}
}

View File

@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -14,27 +14,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.File
import java.util.Properties
import kafka.common.TopicAndPartition
import kafka.message.ByteBufferMessageSet
import kafka.utils.{MockTime, Pool, TestUtils}
import kafka.common._
import kafka.message._
import kafka.utils._
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.{After, Test}
import org.scalatest.junit.JUnitSuite
class LogCleanerManagerTest extends JUnitSuite {
/**
* Unit tests for the log cleaning logic
*/
class LogCleanerManagerTest extends JUnitSuite with Logging {
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val time = new MockTime()
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
val logConfig = LogConfig(logProps)
val time = new MockTime(1400000000000L) // Tue May 13 16:53:20 UTC 2014
@After
def tearDown() {
def tearDown(): Unit = {
Utils.delete(tmpDir)
}
@ -44,7 +53,7 @@ class LogCleanerManagerTest extends JUnitSuite {
* as they are handled by the LogManager
*/
@Test
def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs() {
def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs(): Unit = {
val messageSet = TestUtils.singleMessageSet("test".getBytes)
val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Delete)
val cleanerManager: LogCleanerManager = createCleanerManager(log)
@ -80,15 +89,91 @@ class LogCleanerManagerTest extends JUnitSuite {
assertEquals("should have 1 logs ready to be deleted", 0, readyToDelete)
}
/**
* Test computation of cleanable range with no minimum compaction lag settings active
*/
@Test
def testCleanableOffsetsForNone(): Unit = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
def createCleanerManager(log: Log): LogCleanerManager = {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
while(log.numberOfSegments < 8)
log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = time.milliseconds))
val topicAndPartition = TopicAndPartition("log", 0)
val lastClean = Map(topicAndPartition-> 0L)
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicAndPartition, lastClean, time.milliseconds)
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
assertEquals("The first uncleanable offset begins with the active segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
}
/**
* Test computation of cleanable range with a minimum compaction lag time
*/
@Test
def testCleanableOffsetsForTime(): Unit = {
val compactionLag = 60 * 60 * 1000
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val t0 = time.milliseconds
while(log.numberOfSegments < 4)
log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
val activeSegAtT0 = log.activeSegment
time.sleep(compactionLag + 1)
val t1 = time.milliseconds
while (log.numberOfSegments < 8)
log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t1))
val topicAndPartition = TopicAndPartition("log", 0)
val lastClean = Map(topicAndPartition-> 0L)
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicAndPartition, lastClean, time.milliseconds)
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
assertEquals("The first uncleanable offset begins with the second block of log entries.", activeSegAtT0.baseOffset, cleanableOffsets._2)
}
/**
* Test computation of cleanable range with a minimum compaction lag time that is small enough that
* the active segment contains it.
*/
@Test
def testCleanableOffsetsForShortTime(): Unit = {
val compactionLag = 60 * 60 * 1000
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val t0 = time.milliseconds
while (log.numberOfSegments < 8)
log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
time.sleep(compactionLag + 1)
val topicAndPartition = TopicAndPartition("log", 0)
val lastClean = Map(topicAndPartition-> 0L)
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicAndPartition, lastClean, time.milliseconds)
assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
assertEquals("The first uncleanable offset begins with active segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
}
private def createCleanerManager(log: Log): LogCleanerManager = {
val logs = new Pool[TopicAndPartition, Log]()
logs.put(TopicAndPartition("log", 0), log)
val cleanerManager = new LogCleanerManager(Array(logDir), logs)
cleanerManager
}
def appendMessagesAndExpireSegments(set: ByteBufferMessageSet, log: Log): Unit = {
private def appendMessagesAndExpireSegments(set: ByteBufferMessageSet, log: Log): Unit = {
// append some messages to create some segments
for (i <- 0 until 100)
log.append(set)
@ -97,7 +182,7 @@ class LogCleanerManagerTest extends JUnitSuite {
log.logSegments.foreach(_.lastModified = time.milliseconds - 1000)
}
def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = {
private def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
logProps.put(LogConfig.RetentionMsProp, 1: Integer)
@ -113,5 +198,13 @@ class LogCleanerManagerTest extends JUnitSuite {
log
}
private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
private def message(key: Int, value: Int, timestamp: Long) =
new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
bytes = value.toString.getBytes,
timestamp = timestamp,
magicValue = Message.MagicValue_V1))
}

View File

@ -481,6 +481,7 @@ class KafkaConfigTest {
case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerMinCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")

View File

@ -320,7 +320,7 @@ The compaction is done in the background by periodically recopying log segments.
Log compaction guarantees the following:
<ol>
<li>Any consumer that stays caught-up to within the head of the log will see every message that is written; these messages will have sequential offsets.
<li>Any consumer that stays caught-up to within the head of the log will see every message that is written; these messages will have sequential offsets. The topic's <code>min.compaction.lag.ms</code> can be used to guarantee the minimum length of time must pass after a message is written before it could be compacted. I.e. it provides a lower bound on how long each message will remain in the (uncompacted) head.
<li>Ordering of messages is always maintained. Compaction will never re-order messages, just remove some.
<li>The offset for a message never changes. It is the permanent identifier for a position in the log.
<li>Any read progressing from offset 0 will see at least the final state of all records in the order they were written. All delete markers for deleted records will be seen provided the reader reaches the head of the log in a time period less than the topic's delete.retention.ms setting (the default is 24 hours). This is important as delete marker removal happens concurrently with read (and thus it is important that we not remove any delete marker prior to the reader seeing it).
@ -344,13 +344,14 @@ To enable log cleaning on a particular topic you can add the log-specific proper
<pre> log.cleanup.policy=compact</pre>
This can be done either at topic creation time or using the alter topic command.
<p>
The log cleaner can be configured to retain a minimum amount of the uncompacted "head" of the log. This is enabled by setting the compaction time lag.
<pre> log.cleaner.min.compaction.lag.ms</pre>
This can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag.
</p>
<p>
Further cleaner configurations are described <a href="/documentation.html#brokerconfigs">here</a>.
<h4><a id="design_compactionlimitations" href="#design_compactionlimitations">Log Compaction Limitations</a></h4>
<ol>
<li>You cannot configure yet how much log is retained without compaction (the "head" of the log). Currently all segments are eligible except for the last segment, i.e. the one currently being written to.</li>
</ol>
<h3><a id="design_quotas" href="#design_quotas">4.9 Quotas</a></h3>
<p>
Starting in 0.9, the Kafka cluster has the ability to enforce quotas on produce and fetch requests. Quotas are basically byte-rate thresholds defined per client-id. A client-id logically identifies an application making a request. Hence a single client-id can span multiple producer and consumer instances and the quota will apply for all of them as a single entity i.e. if client-id="test-client" has a produce quota of 10MB/sec, this is shared across all instances with that same id.