kafka-1414; Speedup broker startup after hard reset; patched by Anton Karamanov; reviewed by Jay Kreps and Jun Rao

This commit is contained in:
Anton Karamanov 2014-07-27 21:13:20 -07:00 committed by Jun Rao
parent fa34841d98
commit f489493c38
10 changed files with 203 additions and 167 deletions

View File

@ -62,6 +62,10 @@ log.dirs=/tmp/kafka-logs
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync

View File

@ -23,6 +23,7 @@ import kafka.utils._
import scala.collection._
import kafka.common.{TopicAndPartition, KafkaException}
import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint}
import java.util.concurrent.{Executors, ExecutorService, ExecutionException, Future}
/**
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
@ -39,6 +40,7 @@ class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
@ -54,7 +56,7 @@ class LogManager(val logDirs: Array[File],
createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
loadLogs(logDirs)
loadLogs()
private val cleaner: LogCleaner =
if(cleanerConfig.enableCleaner)
@ -101,36 +103,71 @@ class LogManager(val logDirs: Array[File],
/**
* Recover and load all logs in the given data directories
*/
private def loadLogs(dirs: Seq[File]) {
for(dir <- dirs) {
val recoveryPoints = this.recoveryPointCheckpoints(dir).read
/* load the logs */
val subDirs = dir.listFiles()
if(subDirs != null) {
val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
if(cleanShutDownFile.exists())
info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
else
brokerState.newState(RecoveringFromUncleanShutdown)
private def loadLogs(): Unit = {
info("Loading logs.")
for(dir <- subDirs) {
if(dir.isDirectory) {
info("Loading log '" + dir.getName + "'")
val topicPartition = Log.parseTopicPartitionName(dir.getName)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val log = new Log(dir,
config,
recoveryPoints.getOrElse(topicPartition, 0L),
scheduler,
time)
val previous = this.logs.put(topicPartition, log)
if(previous != null)
throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
for (dir <- this.logDirs) {
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
if (cleanShutdownFile.exists) {
debug(
"Found clean shutdown file. " +
"Skipping recovery for all logs in data directory: " +
dir.getAbsolutePath)
} else {
// log recovery itself is being performed by `Log` class during initialization
brokerState.newState(RecoveringFromUncleanShutdown)
}
val recoveryPoints = this.recoveryPointCheckpoints(dir).read
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
Utils.runnable {
debug("Loading log '" + logDir.getName + "'")
val topicPartition = Log.parseTopicPartitionName(logDir.getName)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
val previous = this.logs.put(topicPartition, current)
if (previous != null) {
throw new IllegalArgumentException(
"Duplicate log directories found: %s, %s!".format(
current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
cleanShutDownFile.delete()
}
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
}
try {
for ((cleanShutdownFile, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
cleanShutdownFile.delete()
}
} catch {
case e: ExecutionException => {
error("There was an error in one of the threads during logs loading: " + e.getCause)
throw e.getCause
}
} finally {
threadPools.foreach(_.shutdown())
}
info("Logs loading complete.")
}
/**
@ -160,31 +197,69 @@ class LogManager(val logDirs: Array[File],
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
/**
* Close all the logs
*/
def shutdown() {
info("Shutting down.")
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
// stop the cleaner first
if (cleaner != null) {
Utils.swallow(cleaner.shutdown())
}
// close logs in each dir
for (dir <- this.logDirs) {
debug("Flushing and closing logs at " + dir)
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values
val jobsForDir = logsInDir map { log =>
Utils.runnable {
// flush the log to ensure latest possible recovery point
log.flush()
log.close()
}
}
jobs(dir) = jobsForDir.map(pool.submit).toSeq
}
try {
// stop the cleaner first
if(cleaner != null)
Utils.swallow(cleaner.shutdown())
// flush the logs to ensure latest possible recovery point
allLogs.foreach(_.flush())
// close the logs
allLogs.foreach(_.close())
// update the last flush point
checkpointRecoveryPointOffsets()
// mark that the shutdown was clean by creating the clean shutdown marker file
logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()))
for ((dir, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
// update the last flush point
debug("Updating recovery points at " + dir)
checkpointLogsInDir(dir)
// mark that the shutdown was clean by creating marker file
debug("Writing clean shutdown marker at " + dir)
Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())
}
} catch {
case e: ExecutionException => {
error("There was an error in one of the threads during LogManager shutdown: " + e.getCause)
throw e.getCause
}
} finally {
threadPools.foreach(_.shutdown())
// regardless of whether the close succeeded, we need to unlock the data directories
dirLocks.foreach(_.destroy())
}
info("Shutdown complete.")
}
/**
* Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset
*
@ -230,14 +305,19 @@ class LogManager(val logDirs: Array[File],
* to avoid recovering the whole log on startup.
*/
def checkpointRecoveryPointOffsets() {
val recoveryPointsByDir = this.logsByTopicPartition.groupBy(_._2.dir.getParent.toString)
for(dir <- logDirs) {
val recoveryPoints = recoveryPointsByDir.get(dir.toString)
if(recoveryPoints.isDefined)
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
this.logDirs.foreach(checkpointLogsInDir)
}
/**
* Make a checkpoint for all logs in provided directory.
*/
private def checkpointLogsInDir(dir: File): Unit = {
val recoveryPoints = this.logsByDir.get(dir.toString)
if (recoveryPoints.isDefined) {
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}
}
/**
* Get the log if it exists, otherwise return None
*/
@ -366,12 +446,21 @@ class LogManager(val logDirs: Array[File],
* Get all the partition logs
*/
def allLogs(): Iterable[Log] = logs.values
/**
* Get a map of TopicAndPartition => Log
*/
def logsByTopicPartition = logs.toMap
/**
* Map of log dir to logs by topic and partitions in that dir
*/
private def logsByDir = {
this.logsByTopicPartition.groupBy {
case (_, log) => log.dir.getParent
}
}
/**
* Flush any log which has exceeded its flush interval and has unwritten messages.
*/

View File

@ -190,6 +190,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */
val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue))
/* the number of threads per data directory to be used for log recovery at startup and flushing at shutdown */
val numRecoveryThreadsPerDataDir = props.getIntInRange("num.recovery.threads.per.data.dir", 1, (1, Int.MaxValue))
/* enable auto creation of topic on the server */
val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)

View File

@ -303,6 +303,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
topicConfigs = configs,
defaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
ioThreads = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,

View File

@ -93,16 +93,14 @@ class KafkaScheduler(val threads: Int,
debug("Scheduling task %s with initial delay %d ms and period %d ms."
.format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
ensureStarted
val runnable = new Runnable {
def run() = {
try {
trace("Begining execution of scheduled task '%s'.".format(name))
fun()
} catch {
case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
} finally {
trace("Completed execution of scheduled task '%s'.".format(name))
}
val runnable = Utils.runnable {
try {
trace("Begining execution of scheduled task '%s'.".format(name))
fun()
} catch {
case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
} finally {
trace("Completed execution of scheduled task '%s'.".format(name))
}
}
if(period >= 0)

View File

@ -49,9 +49,9 @@ object Utils extends Logging {
* @param fun A function
* @return A Runnable that just executes the function
*/
def runnable(fun: () => Unit): Runnable =
new Runnable() {
def run() = fun()
def runnable(fun: => Unit): Runnable =
new Runnable {
def run() = fun
}
/**

View File

@ -35,21 +35,11 @@ class LogManagerTest extends JUnit3Suite {
var logManager: LogManager = null
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
val cleanerConfig = CleanerConfig(enableCleaner = false)
override def setUp() {
super.setUp()
logDir = TestUtils.tempDir()
logManager = new LogManager(logDirs = Array(logDir),
topicConfigs = Map(),
defaultConfig = logConfig,
cleanerConfig = cleanerConfig,
flushCheckMs = 1000L,
flushCheckpointMs = 100000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
time = time,
brokerState = new BrokerState())
logManager = createLogManager()
logManager.startup
logDir = logManager.logDirs(0)
}
@ -125,18 +115,7 @@ class LogManagerTest extends JUnit3Suite {
logManager.shutdown()
val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L)
logManager = new LogManager(
logDirs = Array(logDir),
topicConfigs = Map(),
defaultConfig = config,
cleanerConfig = cleanerConfig,
flushCheckMs = 1000L,
flushCheckpointMs = 100000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
brokerState = new BrokerState(),
time = time
)
logManager = createLogManager()
logManager.startup
// create a log
@ -176,18 +155,7 @@ class LogManagerTest extends JUnit3Suite {
def testTimeBasedFlush() {
logManager.shutdown()
val config = logConfig.copy(flushMs = 1000)
logManager = new LogManager(
logDirs = Array(logDir),
topicConfigs = Map(),
defaultConfig = config,
cleanerConfig = cleanerConfig,
flushCheckMs = 1000L,
flushCheckpointMs = 10000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
brokerState = new BrokerState(),
time = time
)
logManager = createLogManager()
logManager.startup
val log = logManager.createLog(TopicAndPartition(name, 0), config)
val lastFlush = log.lastFlushTime
@ -209,19 +177,8 @@ class LogManagerTest extends JUnit3Suite {
TestUtils.tempDir(),
TestUtils.tempDir())
logManager.shutdown()
logManager = new LogManager(
logDirs = dirs,
topicConfigs = Map(),
defaultConfig = logConfig,
cleanerConfig = cleanerConfig,
flushCheckMs = 1000L,
flushCheckpointMs = 10000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
brokerState = new BrokerState(),
time = time
)
logManager = createLogManager()
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
logManager.createLog(TopicAndPartition("test", partition), logConfig)
@ -237,18 +194,7 @@ class LogManagerTest extends JUnit3Suite {
@Test
def testTwoLogManagersUsingSameDirFails() {
try {
new LogManager(
logDirs = Array(logDir),
topicConfigs = Map(),
defaultConfig = logConfig,
cleanerConfig = cleanerConfig,
flushCheckMs = 1000L,
flushCheckpointMs = 10000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
brokerState = new BrokerState(),
time = time
)
createLogManager()
fail("Should not be able to create a second log manager instance with the same data directory")
} catch {
case e: KafkaException => // this is good
@ -270,16 +216,8 @@ class LogManagerTest extends JUnit3Suite {
def testRecoveryDirectoryMappingWithTrailingSlash() {
logManager.shutdown()
logDir = TestUtils.tempDir()
logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)),
topicConfigs = Map(),
defaultConfig = logConfig,
cleanerConfig = cleanerConfig,
flushCheckMs = 1000L,
flushCheckpointMs = 100000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
time = time,
brokerState = new BrokerState())
logManager = TestUtils.createLogManager(
logDirs = Array(new File(logDir.getAbsolutePath + File.separator)))
logManager.startup
verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
}
@ -293,16 +231,7 @@ class LogManagerTest extends JUnit3Suite {
logDir = new File("data" + File.separator + logDir.getName)
logDir.mkdirs()
logDir.deleteOnExit()
logManager = new LogManager(logDirs = Array(logDir),
topicConfigs = Map(),
defaultConfig = logConfig,
cleanerConfig = cleanerConfig,
flushCheckMs = 1000L,
flushCheckpointMs = 100000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
time = time,
brokerState = new BrokerState())
logManager = createLogManager()
logManager.startup
verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
}
@ -327,4 +256,12 @@ class LogManagerTest extends JUnit3Suite {
}
}
}
private def createLogManager(logDirs: Array[File] = Array(this.logDir)): LogManager = {
TestUtils.createLogManager(
defaultConfig = logConfig,
logDirs = logDirs,
time = this.time)
}
}

View File

@ -32,16 +32,11 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_))
val topic = "foo"
val logManagers = configs.map(config => new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
topicConfigs = Map(),
defaultConfig = LogConfig(),
cleanerConfig = CleanerConfig(),
flushCheckMs = 30000,
flushCheckpointMs = 10000L,
retentionCheckMs = 30000,
scheduler = new KafkaScheduler(1),
brokerState = new BrokerState(),
time = new MockTime))
val logManagers = configs map { config =>
TestUtils.createLogManager(
logDirs = config.logDirs.map(new File(_)).toArray,
cleanerConfig = CleanerConfig())
}
@After
def teardown() {
@ -147,4 +142,4 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L)
}
}
}

View File

@ -18,7 +18,6 @@
package kafka.server
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import kafka.log.{CleanerConfig, LogManager, LogConfig}
import java.util.concurrent.atomic.AtomicBoolean
import java.io.File
@ -37,7 +36,7 @@ class ReplicaManagerTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(1)
val config = new KafkaConfig(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false))
val partition = rm.getOrCreatePartition(topic, 1, 1)
@ -51,26 +50,11 @@ class ReplicaManagerTest extends JUnit3Suite {
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = new KafkaConfig(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false))
val partition = rm.getOrCreatePartition(topic, 1, 1)
partition.getOrCreateReplica(1)
rm.checkpointHighWatermarks()
}
private def createLogManager(logDirs: Array[File]): LogManager = {
val time = new MockTime()
return new LogManager(logDirs,
topicConfigs = Map(),
defaultConfig = new LogConfig(),
cleanerConfig = CleanerConfig(enableCleaner = false),
flushCheckMs = 1000L,
flushCheckpointMs = 100000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
brokerState = new BrokerState(),
time = time)
}
}

View File

@ -39,6 +39,7 @@ import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.common.TopicAndPartition
import kafka.admin.AdminUtils
import kafka.producer.ProducerConfig
import kafka.log._
import junit.framework.AssertionFailedError
import junit.framework.Assert._
@ -689,6 +690,30 @@ object TestUtils extends Logging {
def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = {
ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
}
/**
* Create new LogManager instance with default configuration for testing
*/
def createLogManager(
logDirs: Array[File] = Array.empty[File],
defaultConfig: LogConfig = LogConfig(),
cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false),
time: MockTime = new MockTime()) =
{
new LogManager(
logDirs = logDirs,
topicConfigs = Map(),
defaultConfig = defaultConfig,
cleanerConfig = cleanerConfig,
ioThreads = 4,
flushCheckMs = 1000L,
flushCheckpointMs = 10000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
time = time,
brokerState = new BrokerState())
}
}
object TestZKUtils {