KAFKA-1289 Misc. nitpicks in log cleaner for new 0.8.1 features patch by Jay Kreps, reviewed by Sriram Subramanian and Jun Rao

This commit is contained in:
Joe Stein 2014-03-04 15:30:59 -05:00
parent 40c6555eb0
commit 77118a935e
12 changed files with 30 additions and 22 deletions

View File

@ -73,8 +73,6 @@ log4j.additivity.kafka.controller=false
log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false
log4j.logger.kafka.log.Cleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.Cleaner=false
log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

View File

@ -40,7 +40,7 @@ port=9092
num.network.threads=2
# The number of threads doing disk I/O
num.io.threads=2
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
@ -100,6 +100,10 @@ log.segment.bytes=536870912
# to the retention policies
log.retention.check.interval.ms=60000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
@ -111,6 +115,3 @@ zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
log.cleanup.policy=delete

View File

@ -35,7 +35,7 @@ case class CleanerConfig(val numThreads: Int = 1,
val ioBufferSize: Int = 1024*1024,
val maxMessageSize: Int = 32*1024*1024,
val maxIoBytesPerSecond: Double = Double.MaxValue,
val backOffMs: Long = 60 * 1000,
val backOffMs: Long = 15 * 1000,
val enableCleaner: Boolean = true,
val hashAlgorithm: String = "MD5") {
}

View File

@ -131,6 +131,9 @@ class LogCleaner(val config: CleanerConfig,
*/
private class CleanerThread(threadId: Int)
extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) {
override val loggerName = classOf[LogCleaner].getName
if(config.dedupeBufferSize / config.numThreads > Int.MaxValue)
warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...")
@ -185,7 +188,7 @@ class LogCleaner(val config: CleanerConfig,
def logStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) {
def mb(bytes: Double) = bytes / (1024*1024)
val message =
"%n\tLog cleaner %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) +
"%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) +
"\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead),
stats.elapsedSecs,
mb(stats.bytesRead/stats.elapsedSecs)) +
@ -222,6 +225,8 @@ private[log] class Cleaner(val id: Int,
throttler: Throttler,
time: Time,
checkDone: (TopicAndPartition) => Unit) extends Logging {
override val loggerName = classOf[LogCleaner].getName
this.logIdent = "Cleaner " + id + ": "

View File

@ -40,6 +40,9 @@ private[log] case object LogCleaningPaused extends LogCleaningState
* requested to be resumed.
*/
private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging {
override val loggerName = classOf[LogCleaner].getName
/* the offset checkpoints holding the last cleaned point for each log */
private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
@ -65,7 +68,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
def grabFilthiestLog(): Option[LogToClean] = {
inLock(lock) {
val lastClean = allCleanerCheckpoints()
val cleanableLogs = logs.filter(l => l._2.config.dedupe) // skip any logs marked for delete rather than dedupe
val cleanableLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe
.filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
.map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each
val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes

View File

@ -34,7 +34,7 @@ import kafka.common._
* @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
* @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted.
* @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
* @param dedupe Should old segments in this log be deleted or deduplicated?
* @param compact Should old segments in this log be deleted or deduplicated?
*/
case class LogConfig(val segmentSize: Int = 1024*1024,
val segmentMs: Long = Long.MaxValue,
@ -48,7 +48,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
val fileDeleteDelayMs: Long = 60*1000,
val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L,
val minCleanableRatio: Double = 0.5,
val dedupe: Boolean = false) {
val compact: Boolean = false) {
def toProps: Properties = {
val props = new Properties()
@ -65,7 +65,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete")
props.put(CleanupPolicyProp, if(compact) "compact" else "delete")
props
}
@ -117,7 +117,7 @@ object LogConfig {
fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt,
deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong,
minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble,
dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe")
compact = props.getProperty(CleanupPolicyProp).trim.toLowerCase != "delete")
}
/**

View File

@ -351,7 +351,7 @@ class LogManager(val logDirs: Array[File],
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
for(log <- allLogs; if !log.config.dedupe) {
for(log <- allLogs; if !log.config.compact) {
debug("Garbage collecting '" + log.name + "'")
total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}

View File

@ -137,7 +137,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val logCleanerDedupeBufferLoadFactor = props.getDouble("log.cleaner.io.buffer.load.factor", 0.9d)
/* the amount of time to sleep when there are no logs to clean */
val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 30*1000, (0L, Long.MaxValue))
val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 15*1000, (0L, Long.MaxValue))
/* the minimum ratio of dirty log to total log for a log to eligible for cleaning */
val logCleanerMinCleanRatio = props.getDouble("log.cleaner.min.cleanable.ratio", 0.5)

View File

@ -262,7 +262,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
deleteRetentionMs = config.logCleanerDeleteRetentionMs,
fileDeleteDelayMs = config.logDeleteDelayMs,
minCleanableRatio = config.logCleanerMinCleanRatio,
dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe")
compact = config.logCleanupPolicy.trim.toLowerCase == "compact")
val defaultProps = defaultLogConfig.toProps
val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
// read the log configurations from zookeeper

View File

@ -243,11 +243,11 @@ object TestLogCleaning {
percentDeletes: Int): File = {
val producerProps = new Properties
producerProps.setProperty("producer.type", "async")
producerProps.setProperty("broker.list", brokerUrl)
producerProps.setProperty("metadata.broker.list", brokerUrl)
producerProps.setProperty("serializer.class", classOf[StringEncoder].getName)
producerProps.setProperty("key.serializer.class", classOf[StringEncoder].getName)
producerProps.setProperty("queue.enqueue.timeout.ms", "-1")
producerProps.setProperty("batch.size", 1000.toString)
producerProps.setProperty("batch.num.messages", 1000.toString)
val producer = new Producer[String, String](new ProducerConfig(producerProps))
val rand = new Random(1)
val keyCount = (messages / dups).toInt
@ -275,8 +275,9 @@ object TestLogCleaning {
def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = {
val consumerProps = new Properties
consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
consumerProps.setProperty("zk.connect", zkUrl)
consumerProps.setProperty("consumer.timeout.ms", (10*1000).toString)
consumerProps.setProperty("zookeeper.connect", zkUrl)
consumerProps.setProperty("consumer.timeout.ms", (20*1000).toString)
consumerProps.setProperty("auto.offset.reset", "smallest")
new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
}

View File

@ -33,7 +33,7 @@ import kafka.message._
class CleanerTest extends JUnitSuite {
val dir = TestUtils.tempDir()
val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, dedupe=true)
val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, compact=true)
val time = new MockTime()
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)

View File

@ -101,7 +101,7 @@ class LogCleanerIntegrationTest extends JUnitSuite {
val dir = new File(logDir, "log-" + i)
dir.mkdirs()
val log = new Log(dir = dir,
LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true),
LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, compact = true),
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)