From 16b0f44559de969f1fd4387dceda881e6aabdc1c Mon Sep 17 00:00:00 2001 From: Edward Jay Kreps Date: Thu, 19 Jul 2012 21:02:15 +0000 Subject: [PATCH] KAFKA-371 Refactoring of LogManager. Reviewed by Neha. git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1363542 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/scala/kafka/log/LogManager.scala | 97 +++++-------------- .../main/scala/kafka/server/KafkaApis.scala | 2 +- .../main/scala/kafka/server/KafkaServer.scala | 12 +-- .../kafka/integration/TopicMetadataTest.scala | 3 +- 4 files changed, 27 insertions(+), 87 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 8dba74ee19f..ed5b11a13be 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1,5 +1,5 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 @@ -40,12 +40,10 @@ private[kafka] class LogManager(val config: KafkaConfig, private val numPartitions = config.numPartitions private val maxSize: Long = config.logFileSize private val flushInterval = config.flushInterval - private val topicPartitionsMap = config.topicPartitionsMap private val logCreationLock = new Object - private val startupLatch: CountDownLatch = new CountDownLatch(1) private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false) - private val logFlushIntervalMap = config.flushIntervalMap - private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap) + private val logFlushIntervals = config.flushIntervalMap + private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms private val logRetentionSize = config.logRetentionSize private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false) @@ -72,19 +70,11 @@ private[kafka] class LogManager(val config: KafkaConfig, } } } - - private def getLogRetentionMSMap(logRetentionHourMap: Map[String, Int]) : Map[String, Long] = { - var ret = new mutable.HashMap[String, Long] - for ( (topic, hour) <- logRetentionHourMap ) - ret.put(topic, hour * 60 * 60 * 1000L) - ret - } /** - * Register this broker in ZK for the first time. + * Start the log flush thread */ def startup() { - /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { if(scheduler.hasShutdown) { @@ -96,13 +86,8 @@ private[kafka] class LogManager(val config: KafkaConfig, } if(logFlusherScheduler.hasShutdown) logFlusherScheduler.startUp - info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap) + info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervals) logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate) - startupLatch.countDown - } - - private def awaitStartup() { - startupLatch.await } @@ -112,9 +97,9 @@ private[kafka] class LogManager(val config: KafkaConfig, private def createLog(topic: String, partition: Int): Log = { if (topic.length <= 0) throw new InvalidTopicException("topic name can't be emtpy") - if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) { + if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) { val error = "Wrong partition %d, valid partitions (0, %d)." - .format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1)) + .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1)) warn(error) throw new InvalidPartitionException(error) } @@ -125,22 +110,6 @@ private[kafka] class LogManager(val config: KafkaConfig, } } - /** - * Return the Pool (partitions) for a specific log - */ - private def getLogPool(topic: String, partition: Int): Pool[Int, Log] = { - awaitStartup - if (topic.length <= 0) - throw new InvalidTopicException("topic name can't be empty") - if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) { - val error = "Wrong partition %d, valid partitions (0, %d)." - .format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1)) - warn(error) - throw new InvalidPartitionException(error) - } - logs.get(topic) - } - def getOffsets(offsetRequest: OffsetRequest): Array[Long] = { val log = getLog(offsetRequest.topic, offsetRequest.partition) log match { @@ -150,10 +119,10 @@ private[kafka] class LogManager(val config: KafkaConfig, } /** - * Get the log if exists + * Get the log if it exists */ def getLog(topic: String, partition: Int): Option[Log] = { - val parts = getLogPool(topic, partition) + val parts = logs.get(topic) if (parts == null) None else { val log = parts.get(partition) @@ -167,7 +136,7 @@ private[kafka] class LogManager(val config: KafkaConfig, */ def getOrCreateLog(topic: String, partition: Int): Log = { var hasNewTopic = false - var parts = getLogPool(topic, partition) + var parts = logs.get(topic) if (parts == null) { val found = logs.putIfNotExists(topic, new Pool[Int, Log]) if (found == null) @@ -195,8 +164,8 @@ private[kafka] class LogManager(val config: KafkaConfig, private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds val topic = Utils.getTopicPartition(log.name)._1 - val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs) - val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS) + val logCleanupThresholdMs = logRetentionMs.get(topic).getOrElse(this.logCleanupDefaultAgeMs) + val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMs) val total = log.deleteSegments(toBeDeleted) total } @@ -226,11 +195,9 @@ private[kafka] class LogManager(val config: KafkaConfig, */ def cleanupLogs() { debug("Beginning log cleanup...") - val iter = getLogIterator var total = 0 val startMs = time.milliseconds - while(iter.hasNext) { - val log = iter.next + for(log <- allLogs) { debug("Garbage collecting '" + log.name + "'") total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } @@ -245,41 +212,23 @@ private[kafka] class LogManager(val config: KafkaConfig, info("Closing log manager") scheduler.shutdown() logFlusherScheduler.shutdown() - val iter = getLogIterator - while(iter.hasNext) - iter.next.close() + allLogs.foreach(_.close()) } - private def getLogIterator(): Iterator[Log] = { - new IteratorTemplate[Log] { - val partsIter = logs.values.iterator - var logIter: Iterator[Log] = null - - override def makeNext(): Log = { - while (true) { - if (logIter != null && logIter.hasNext) - return logIter.next - if (!partsIter.hasNext) - return allDone - logIter = partsIter.next.values.iterator - } - // should never reach here - assert(false) - return allDone - } - } - } + /** + * Get all the partition logs + */ + def allLogs() = logs.values.flatMap(_.values) private def flushAllLogs() = { debug("flushing the high watermark of all logs") - - for (log <- getLogIterator) + for (log <- allLogs) { try{ val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime var logFlushInterval = config.defaultFlushIntervalMs - if(logFlushIntervalMap.contains(log.topicName)) - logFlushInterval = logFlushIntervalMap(log.topicName) + if(logFlushIntervals.contains(log.topicName)) + logFlushInterval = logFlushIntervals(log.topicName) debug(log.topicName + " flush interval " + logFlushInterval + " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush) if(timeSinceLastFlush >= logFlushInterval) @@ -299,8 +248,6 @@ private[kafka] class LogManager(val config: KafkaConfig, } - def getAllTopics(): Iterator[String] = logs.keys.iterator - def getTopicPartitionsMap() = topicPartitionsMap + def topics(): Iterable[String] = logs.keys - def getServerConfig: KafkaConfig = config } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7b450a70fc9..71f26963a87 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -381,7 +381,6 @@ class KafkaApis(val requestChannel: RequestChannel, requestLogger.trace("Topic metadata request " + metadataRequest.toString()) val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]() - val config = logManager.getServerConfig val zkClient = kafkaZookeeper.getZookeeperClient val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient) @@ -391,6 +390,7 @@ class KafkaApis(val requestChannel: RequestChannel, case Some(metadata) => topicsMetadata += metadata case None => /* check if auto creation of topics is turned on */ + val config = logManager.config if(config.autoCreateTopics) { CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 56483d02731..1bcd0f191a3 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -64,6 +64,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg 1000L * 60 * config.logCleanupIntervalMinutes, 1000L * 60 * 60 * config.logRetentionHours, needRecovery) + logManager.startup() socketServer = new SocketServer(config.port, config.numNetworkThreads, @@ -78,19 +79,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper) requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads) - socketServer.startup + socketServer.startup() Mx4jLoader.maybeLoad - /** - * Registers this broker in ZK. After this, consumers can connect to broker. - * So this should happen after socket server start. - */ - logManager.startup - // starting relevant replicas and leader election for partitions assigned to this broker - kafkaZookeeper.startup - + kafkaZookeeper.startup() kafkaController.startup() info("Server started.") diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index b26cf25c5f0..d8a7ed62840 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -83,7 +83,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper]) val replicaManager = EasyMock.createMock(classOf[ReplicaManager]) EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient) - EasyMock.expect(logManager.getServerConfig).andReturn(configs.head) + EasyMock.expect(logManager.config).andReturn(configs.head) EasyMock.replay(logManager) EasyMock.replay(kafkaZookeeper) @@ -118,7 +118,6 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null)) // verify the expected calls to log manager occurred in the right order - EasyMock.verify(logManager) EasyMock.verify(kafkaZookeeper) EasyMock.verify(receivedRequest) }