KAFKA-371 Refactoring of LogManager. Reviewed by Neha.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1363542 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Edward Jay Kreps 2012-07-19 21:02:15 +00:00
parent 9e44db6af0
commit 16b0f44559
4 changed files with 27 additions and 87 deletions

View File

@ -40,12 +40,10 @@ private[kafka] class LogManager(val config: KafkaConfig,
private val numPartitions = config.numPartitions
private val maxSize: Long = config.logFileSize
private val flushInterval = config.flushInterval
private val topicPartitionsMap = config.topicPartitionsMap
private val logCreationLock = new Object
private val startupLatch: CountDownLatch = new CountDownLatch(1)
private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
private val logFlushIntervalMap = config.flushIntervalMap
private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
private val logFlushIntervals = config.flushIntervalMap
private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
private val logRetentionSize = config.logRetentionSize
private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
@ -73,18 +71,10 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
}
private def getLogRetentionMSMap(logRetentionHourMap: Map[String, Int]) : Map[String, Long] = {
var ret = new mutable.HashMap[String, Long]
for ( (topic, hour) <- logRetentionHourMap )
ret.put(topic, hour * 60 * 60 * 1000L)
ret
}
/**
* Register this broker in ZK for the first time.
* Start the log flush thread
*/
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
if(scheduler.hasShutdown) {
@ -96,13 +86,8 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
if(logFlusherScheduler.hasShutdown) logFlusherScheduler.startUp
info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap)
info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervals)
logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
startupLatch.countDown
}
private def awaitStartup() {
startupLatch.await
}
@ -112,9 +97,9 @@ private[kafka] class LogManager(val config: KafkaConfig,
private def createLog(topic: String, partition: Int): Log = {
if (topic.length <= 0)
throw new InvalidTopicException("topic name can't be emtpy")
if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) {
if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
val error = "Wrong partition %d, valid partitions (0, %d)."
.format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
.format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
warn(error)
throw new InvalidPartitionException(error)
}
@ -125,22 +110,6 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
}
/**
* Return the Pool (partitions) for a specific log
*/
private def getLogPool(topic: String, partition: Int): Pool[Int, Log] = {
awaitStartup
if (topic.length <= 0)
throw new InvalidTopicException("topic name can't be empty")
if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) {
val error = "Wrong partition %d, valid partitions (0, %d)."
.format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
warn(error)
throw new InvalidPartitionException(error)
}
logs.get(topic)
}
def getOffsets(offsetRequest: OffsetRequest): Array[Long] = {
val log = getLog(offsetRequest.topic, offsetRequest.partition)
log match {
@ -150,10 +119,10 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
/**
* Get the log if exists
* Get the log if it exists
*/
def getLog(topic: String, partition: Int): Option[Log] = {
val parts = getLogPool(topic, partition)
val parts = logs.get(topic)
if (parts == null) None
else {
val log = parts.get(partition)
@ -167,7 +136,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
*/
def getOrCreateLog(topic: String, partition: Int): Log = {
var hasNewTopic = false
var parts = getLogPool(topic, partition)
var parts = logs.get(topic)
if (parts == null) {
val found = logs.putIfNotExists(topic, new Pool[Int, Log])
if (found == null)
@ -195,8 +164,8 @@ private[kafka] class LogManager(val config: KafkaConfig,
private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
val topic = Utils.getTopicPartition(log.name)._1
val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
val logCleanupThresholdMs = logRetentionMs.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMs)
val total = log.deleteSegments(toBeDeleted)
total
}
@ -226,11 +195,9 @@ private[kafka] class LogManager(val config: KafkaConfig,
*/
def cleanupLogs() {
debug("Beginning log cleanup...")
val iter = getLogIterator
var total = 0
val startMs = time.milliseconds
while(iter.hasNext) {
val log = iter.next
for(log <- allLogs) {
debug("Garbage collecting '" + log.name + "'")
total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
@ -245,41 +212,23 @@ private[kafka] class LogManager(val config: KafkaConfig,
info("Closing log manager")
scheduler.shutdown()
logFlusherScheduler.shutdown()
val iter = getLogIterator
while(iter.hasNext)
iter.next.close()
allLogs.foreach(_.close())
}
private def getLogIterator(): Iterator[Log] = {
new IteratorTemplate[Log] {
val partsIter = logs.values.iterator
var logIter: Iterator[Log] = null
override def makeNext(): Log = {
while (true) {
if (logIter != null && logIter.hasNext)
return logIter.next
if (!partsIter.hasNext)
return allDone
logIter = partsIter.next.values.iterator
}
// should never reach here
assert(false)
return allDone
}
}
}
/**
* Get all the partition logs
*/
def allLogs() = logs.values.flatMap(_.values)
private def flushAllLogs() = {
debug("flushing the high watermark of all logs")
for (log <- getLogIterator)
for (log <- allLogs)
{
try{
val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
var logFlushInterval = config.defaultFlushIntervalMs
if(logFlushIntervalMap.contains(log.topicName))
logFlushInterval = logFlushIntervalMap(log.topicName)
if(logFlushIntervals.contains(log.topicName))
logFlushInterval = logFlushIntervals(log.topicName)
debug(log.topicName + " flush interval " + logFlushInterval +
" last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= logFlushInterval)
@ -299,8 +248,6 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
def getAllTopics(): Iterator[String] = logs.keys.iterator
def getTopicPartitionsMap() = topicPartitionsMap
def topics(): Iterable[String] = logs.keys
def getServerConfig: KafkaConfig = config
}

View File

@ -381,7 +381,6 @@ class KafkaApis(val requestChannel: RequestChannel,
requestLogger.trace("Topic metadata request " + metadataRequest.toString())
val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
val config = logManager.getServerConfig
val zkClient = kafkaZookeeper.getZookeeperClient
val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
@ -391,6 +390,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case Some(metadata) => topicsMetadata += metadata
case None =>
/* check if auto creation of topics is turned on */
val config = logManager.config
if(config.autoCreateTopics) {
CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"

View File

@ -64,6 +64,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
1000L * 60 * config.logCleanupIntervalMinutes,
1000L * 60 * 60 * config.logRetentionHours,
needRecovery)
logManager.startup()
socketServer = new SocketServer(config.port,
config.numNetworkThreads,
@ -78,19 +79,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
socketServer.startup
socketServer.startup()
Mx4jLoader.maybeLoad
/**
* Registers this broker in ZK. After this, consumers can connect to broker.
* So this should happen after socket server start.
*/
logManager.startup
// starting relevant replicas and leader election for partitions assigned to this broker
kafkaZookeeper.startup
kafkaZookeeper.startup()
kafkaController.startup()
info("Server started.")

View File

@ -83,7 +83,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
EasyMock.expect(logManager.getServerConfig).andReturn(configs.head)
EasyMock.expect(logManager.config).andReturn(configs.head)
EasyMock.replay(logManager)
EasyMock.replay(kafkaZookeeper)
@ -118,7 +118,6 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
// verify the expected calls to log manager occurred in the right order
EasyMock.verify(logManager)
EasyMock.verify(kafkaZookeeper)
EasyMock.verify(receivedRequest)
}