KAFKA-405 Improve high watermark maintenance to store high watermarks for all partitions in a single .highwatermark file; patched by Neha Narkhede; reviewed by Jay Kreps and Jun Rao

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1365841 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Neha Narkhede 2012-07-25 23:42:07 +00:00
parent 842da7bfcd
commit b49de724f1
20 changed files with 577 additions and 296 deletions

View File

@ -56,7 +56,9 @@ class Partition(val topic: String,
assignedReplicas
}
def getReplica(replicaId: Int): Option[Replica] = assignedReplicas().find(_.brokerId == replicaId)
def getReplica(replicaId: Int): Option[Replica] = {
assignedReplicas().find(_.brokerId == replicaId)
}
def addReplica(replica: Replica): Boolean = {
if(!assignedReplicas.contains(replica)) {
@ -65,8 +67,7 @@ class Partition(val topic: String,
}else false
}
def updateReplicaLEO(replica: Replica, leo: Long) {
replica.leoUpdateTime = time.milliseconds
def updateReplicaLeo(replica: Replica, leo: Long) {
replica.logEndOffset(Some(leo))
debug("Updating the leo to %d for replica %d".format(leo, replica.brokerId))
}
@ -108,7 +109,7 @@ class Partition(val topic: String,
val possiblyStuckReplicas = inSyncReplicas.filter(r => r.logEndOffset() < leaderReplica().logEndOffset())
info("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId,
possiblyStuckReplicas.map(_.brokerId).mkString(",")))
val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime() < (time.milliseconds - keepInSyncTimeMs))
val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime < (time.milliseconds - keepInSyncTimeMs))
info("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
val leader = leaderReplica()
// Case 2 above

View File

@ -18,43 +18,38 @@
package kafka.cluster
import kafka.log.Log
import kafka.utils.Logging
import kafka.utils.{SystemTime, Time, Logging}
import kafka.common.KafkaException
class Replica(val brokerId: Int,
val partition: Partition,
val topic: String,
var log: Option[Log] = None,
var leoUpdateTime: Long = -1L) extends Logging {
time: Time = SystemTime,
var hw: Option[Long] = None,
var log: Option[Log] = None) extends Logging {
private var logEndOffset: Long = -1L
private var logEndOffsetUpdateTimeMs: Long = -1L
def logEndOffset(newLeo: Option[Long] = None): Long = {
isLocal match {
case true =>
newLeo match {
case Some(newOffset) => throw new KafkaException("Trying to set the leo %d for local log".format(newOffset))
case Some(newOffset) => logEndOffsetUpdateTimeMs = time.milliseconds; newOffset
case None => log.get.logEndOffset
}
case false =>
newLeo match {
case Some(newOffset) =>
logEndOffset = newOffset
logEndOffsetUpdateTimeMs = time.milliseconds
trace("Setting log end offset for replica %d for topic %s partition %d to %d"
.format(brokerId, topic, partition.partitionId, logEndOffset))
logEndOffset
case None => logEndOffset
}
}
}
def logEndOffsetUpdateTime(time: Option[Long] = None): Long = {
time match {
case Some(t) =>
leoUpdateTime = t
leoUpdateTime
case None =>
leoUpdateTime
}
}
def isLocal: Boolean = {
log match {
case Some(l) => true
@ -62,6 +57,8 @@ class Replica(val brokerId: Int,
}
}
def logEndOffsetUpdateTime = logEndOffsetUpdateTimeMs
def highWatermark(highwaterMarkOpt: Option[Long] = None): Long = {
highwaterMarkOpt match {
case Some(highwaterMark) =>
@ -69,7 +66,7 @@ class Replica(val brokerId: Int,
case true =>
trace("Setting hw for topic %s partition %d on broker %d to %d".format(topic, partition.partitionId,
brokerId, highwaterMark))
log.get.setHW(highwaterMark)
hw = Some(highwaterMark)
highwaterMark
case false => throw new KafkaException("Unable to set highwatermark for topic %s ".format(topic) +
"partition %d on broker %d, since there is no local log for this partition"
@ -78,7 +75,11 @@ class Replica(val brokerId: Int,
case None =>
isLocal match {
case true =>
log.get.getHW()
hw match {
case Some(highWatermarkValue) => highWatermarkValue
case None => throw new KafkaException("HighWatermark does not exist for topic %s ".format(topic) +
" partition %d on broker %d but local log exists".format(partition.partitionId, brokerId))
}
case false => throw new KafkaException("Unable to get highwatermark for topic %s ".format(topic) +
"partition %d on broker %d, since there is no local log for this partition"
.format(partition.partitionId, brokerId))

View File

@ -93,9 +93,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var fetcher: Option[ConsumerFetcherManager] = None
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
// topicThreadIdAndQueues : (topic,consumerThreadId) -> queue
private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
private val scheduler = new KafkaScheduler(1)
private val messageStreamCreated = new AtomicBoolean(false)
private var sessionExpirationListener: ZKSessionExpireListener = null
@ -121,8 +120,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
connectZk()
createFetcher()
if (config.autoCommit) {
scheduler.startUp
info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs)
scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
config.autoCommitIntervalMs, false)
}
def this(config: ConsumerConfig) = this(config, true)

View File

@ -18,7 +18,7 @@
package kafka.log
import kafka.api.OffsetRequest
import java.io.{IOException, RandomAccessFile, File}
import java.io.{IOException, File}
import java.util.{Comparator, Collections, ArrayList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger}
import kafka.utils._
@ -105,10 +105,8 @@ class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long
* of data to be deleted, we have to compute the offset relative to start of the log segment
* @param offset Absolute offset for this partition
*/
def truncateUpto(offset: Long) = {
assert(offset >= start, "Offset %d used for truncating this log segment cannot be smaller than the start offset %d".
format(offset, start))
messageSet.truncateUpto(offset - start)
def truncateTo(offset: Long) = {
messageSet.truncateTo(offset - start)
}
}
@ -134,13 +132,6 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In
/* The actual segments of the log */
private[log] val segments: SegmentList[LogSegment] = loadSegments()
/* create the leader highwatermark file handle */
private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + hwFileName, "rw")
info("Created highwatermark file %s for log %s".format(dir.getAbsolutePath + "/" + hwFileName, name))
/* If hw file is absent, the hw defaults to 0. If it exists, hw is set to the checkpointed value */
private var hw: Long = if(hwFile.length() > 0) hwFile.readLong() else { hwFile.writeLong(0); 0 }
private val logStats = new LogStats(this)
Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
@ -221,8 +212,6 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In
info("Closing log segment " + seg.file.getAbsolutePath)
seg.messageSet.close()
}
checkpointHW()
hwFile.close()
}
}
@ -363,7 +352,6 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In
segments.view.last.messageSet.flush()
unflushed.set(0)
lastflushedTime.set(System.currentTimeMillis)
checkpointHW()
}
}
@ -435,47 +423,31 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In
total
}
def recoverUptoLastCheckpointedHW() {
if(hwFile.length() > 0) {
// read the last checkpointed hw from disk
hwFile.seek(0)
val lastKnownHW = hwFile.readLong()
info("Recovering log %s upto highwatermark %d".format(name, lastKnownHW))
def truncateTo(targetOffset: Long) {
// find the log segment that has this hw
val segmentToBeTruncated = segments.view.find(segment =>
lastKnownHW >= segment.start && lastKnownHW < segment.absoluteEndOffset)
targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
segmentToBeTruncated match {
case Some(segment) =>
segment.truncateUpto(lastKnownHW)
info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw))
val truncatedSegmentIndex = segments.view.indexOf(segment)
segments.truncLast(truncatedSegmentIndex)
segment.truncateTo(targetOffset)
info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
case None =>
assert(lastKnownHW <= logEndOffset,
assert(targetOffset <= segments.view.last.absoluteEndOffset,
"Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".
format(lastKnownHW, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
.format(lastKnownHW, segments.view.head.start, logEndOffset))
.format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
}
val segmentsToBeDeleted = segments.view.filter(segment => segment.start > lastKnownHW)
val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
if(segmentsToBeDeleted.size < segments.view.size) {
val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
if(numSegmentsDeleted != segmentsToBeDeleted.size)
error("Failed to delete some segments during log recovery")
}else
info("Unable to recover log upto hw. No previously checkpointed high watermark found for " + name)
}
def setHW(latestLeaderHW: Long) {
hw = latestLeaderHW
}
def getHW(): Long = hw
def checkpointHW() {
hwFile.seek(0)
hwFile.writeLong(hw)
hwFile.getChannel.force(true)
info("Checkpointed highwatermark %d for log %s".format(hw, name))
}
}
def topicName():String = {

View File

@ -30,6 +30,7 @@ import kafka.common.{KafkaException, InvalidTopicException, InvalidPartitionExce
*/
@threadsafe
private[kafka] class LogManager(val config: KafkaConfig,
scheduler: KafkaScheduler,
private val time: Time,
val logCleanupIntervalMs: Long,
val logCleanupDefaultAgeMs: Long,
@ -40,11 +41,9 @@ private[kafka] class LogManager(val config: KafkaConfig,
private val maxSize: Long = config.logFileSize
private val flushInterval = config.flushInterval
private val logCreationLock = new Object
private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
private val logFlushIntervals = config.flushIntervalMap
private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
private val logRetentionSize = config.logRetentionSize
private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
/* Initialize a log for each subdirectory of the main log directory */
private val logs = new Pool[String, Pool[Int, Log]]()
@ -76,17 +75,13 @@ private[kafka] class LogManager(val config: KafkaConfig,
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
if(scheduler.hasShutdown) {
println("Restarting log cleaner scheduler")
scheduler.startUp
}
info("starting log cleaner every " + logCleanupIntervalMs + " ms")
scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
info("Starting log flusher every " + config.flushSchedulerThreadRate +
" ms with the following overrides " + logFlushIntervals)
scheduler.scheduleWithRate(flushAllLogs, "kafka-logflusher-",
config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
}
if(logFlusherScheduler.hasShutdown) logFlusherScheduler.startUp
info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervals)
logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
}
@ -95,7 +90,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
*/
private def createLog(topic: String, partition: Int): Log = {
if (topic.length <= 0)
throw new InvalidTopicException("topic name can't be emtpy")
throw new InvalidTopicException("Topic name can't be emtpy")
if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
val error = "Wrong partition %d, valid partitions (0, %d)."
.format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
@ -207,10 +202,8 @@ private[kafka] class LogManager(val config: KafkaConfig,
/**
* Close all the logs
*/
def close() {
def shutdown() {
info("Closing log manager")
scheduler.shutdown()
logFlusherScheduler.shutdown()
allLogs.foreach(_.close())
}
@ -220,7 +213,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
def allLogs() = logs.values.flatMap(_.values)
private def flushAllLogs() = {
debug("flushing the high watermark of all logs")
debug("Flushing the high watermark of all logs")
for (log <- allLogs)
{
try{

View File

@ -36,7 +36,7 @@ class LogStats(val log: Log) extends LogStatsMBean {
def getNumberOfSegments: Int = log.numberOfSegments
def getCurrentOffset: Long = log.getHW()
def getCurrentOffset: Long = log.logEndOffset
def getNumAppendedMessages: Long = numCumulatedMessages.get

View File

@ -196,9 +196,12 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
len - validUpTo
}
def truncateUpto(hw: Long) = {
channel.truncate(hw)
setSize.set(hw)
def truncateTo(targetSize: Long) = {
if(targetSize >= sizeInBytes())
throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) +
" size of this log segment is only %d bytes".format(sizeInBytes()))
channel.truncate(targetSize)
setSize.set(targetSize)
}
/**

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.Logging
import java.util.concurrent.locks.ReentrantLock
import java.io._
/**
* This class handles the read/write to the highwaterMark checkpoint file. The file stores the high watermark value for
* all topics and partitions that this broker hosts. The format of this file is as follows -
* version
* number of entries
* topic partition highwatermark
*/
object HighwaterMarkCheckpoint {
val highWatermarkFileName = ".highwatermark"
val currentHighwaterMarkFileVersion = 0
}
class HighwaterMarkCheckpoint(val path: String) extends Logging {
/* create the highwatermark file handle for all partitions */
val name = path + "/" + HighwaterMarkCheckpoint.highWatermarkFileName
private val hwFile = new File(name)
private val hwFileLock = new ReentrantLock()
// recover from previous tmp file, if required
def write(highwaterMarksPerPartition: Map[(String, Int), Long]) {
hwFileLock.lock()
try {
// write to temp file and then swap with the highwatermark file
val tempHwFile = new File(hwFile + ".tmp")
// it is an error for this file to be present. It could mean that the previous rename operation failed
if(tempHwFile.exists()) {
fatal("Temporary high watermark %s file exists. This could mean that the ".format(tempHwFile.getAbsolutePath) +
"previous high watermark checkpoint operation has failed.")
System.exit(1)
}
val hwFileWriter = new BufferedWriter(new FileWriter(tempHwFile))
// checkpoint highwatermark for all partitions
// write the current version
hwFileWriter.write(HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion.toString)
hwFileWriter.newLine()
// write the number of entries in the highwatermark file
hwFileWriter.write(highwaterMarksPerPartition.size.toString)
hwFileWriter.newLine()
highwaterMarksPerPartition.foreach { partitionAndHw =>
val topic = partitionAndHw._1._1
val partitionId = partitionAndHw._1._2
hwFileWriter.write("%s %s %s".format(topic, partitionId, partitionAndHw._2))
hwFileWriter.newLine()
}
hwFileWriter.flush()
hwFileWriter.close()
// swap new high watermark file with previous one
hwFile.delete()
if(!tempHwFile.renameTo(hwFile)) {
fatal("Attempt to swap the new high watermark file with the old one failed")
System.exit(1)
}
}finally {
hwFileLock.unlock()
}
}
def read(topic: String, partition: Int): Long = {
hwFileLock.lock()
try {
hwFile.length() match {
case 0 => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
"partition %d. Returning 0 as the highwatermark".format(partition))
0L
case _ =>
val hwFileReader = new BufferedReader(new FileReader(hwFile))
val version = hwFileReader.readLine().toShort
version match {
case HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion =>
val numberOfHighWatermarks = hwFileReader.readLine().toInt
val partitionHighWatermarks =
for(i <- 0 until numberOfHighWatermarks) yield {
val nextHwEntry = hwFileReader.readLine()
val partitionHwInfo = nextHwEntry.split(" ")
val highwaterMark = partitionHwInfo.last.toLong
val partitionId = partitionHwInfo.takeRight(2).head
// find the index of partition
val partitionIndex = nextHwEntry.indexOf(partitionId)
val topic = nextHwEntry.substring(0, partitionIndex-1)
((topic, partitionId.toInt) -> highwaterMark)
}
hwFileReader.close()
val hwOpt = partitionHighWatermarks.toMap.get((topic, partition))
hwOpt match {
case Some(hw) => debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file"
.format(hw, topic, partition))
hw
case None => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
"partition %d. Returning 0 as the highwatermark".format(partition))
0L
}
case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version)
System.exit(1)
-1L
}
}
}finally {
hwFileLock.unlock()
}
}
}

View File

@ -171,7 +171,7 @@ class KafkaApis(val requestChannel: RequestChannel,
kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
replicaManager.recordLeaderLogUpdate(topicData.topic, partitionData.partition)
replicaManager.recordLeaderLogEndOffset(topicData.topic, partitionData.partition, log.logEndOffset)
offsets(msgIndex) = log.logEndOffset
errors(msgIndex) = ErrorMapping.NoError.toShort
trace(partitionData.messages.sizeInBytes + " bytes written to logs.")

View File

@ -40,11 +40,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
private var logManager: LogManager = null
var logManager: LogManager = null
var kafkaZookeeper: KafkaZooKeeper = null
private var replicaManager: ReplicaManager = null
var replicaManager: ReplicaManager = null
private var apis: KafkaApis = null
var kafkaController: KafkaController = new KafkaController(config)
val kafkaScheduler = new KafkaScheduler(4)
var zkClient: ZkClient = null
/**
@ -62,9 +63,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
cleanShutDownFile.delete
}
/* start client */
info("connecting to ZK: " + config.zkConnect)
info("Connecting to ZK: " + config.zkConnect)
zkClient = KafkaZookeeperClient.getZookeeperClient(config)
/* start scheduler */
kafkaScheduler.startUp
/* start log manager */
logManager = new LogManager(config,
kafkaScheduler,
SystemTime,
1000L * 60 * config.logCleanupIntervalMinutes,
1000L * 60 * 60 * config.logRetentionHours,
@ -80,7 +85,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
kafkaZookeeper = new KafkaZooKeeper(config, zkClient, addReplica, getReplica, makeLeader, makeFollower)
replicaManager = new ReplicaManager(config, time, zkClient)
replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler)
apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
@ -90,6 +95,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
// starting relevant replicas and leader election for partitions assigned to this broker
kafkaZookeeper.startup()
// start the replica manager
replicaManager.startup()
// start the controller
kafkaController.startup()
info("Server started.")
@ -103,23 +111,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
info("Shutting down Kafka server with id " + config.brokerId)
kafkaScheduler.shutdown()
apis.close()
if(replicaManager != null)
replicaManager.close()
replicaManager.shutdown()
if (socketServer != null)
socketServer.shutdown()
if(requestHandlerPool != null)
requestHandlerPool.shutdown()
Utils.unregisterMBean(statsMBeanName)
if(logManager != null)
logManager.close()
logManager.shutdown()
if(kafkaController != null)
kafkaController.shutDown()
kafkaZookeeper.close
info("Closing zookeeper client...")
kafkaZookeeper.shutdown()
zkClient.close()
val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
cleanShutDownFile.createNewFile

View File

@ -104,7 +104,7 @@ class KafkaZooKeeper(config: KafkaConfig,
}
}
def close() {
def shutdown() {
stateChangeHandler.shutdown()
}

View File

@ -13,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
*/
package kafka.server
import kafka.log.Log
@ -25,21 +25,28 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
import kafka.common.{KafkaException, InvalidPartitionException}
class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging {
class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient, kafkaScheduler: KafkaScheduler)
extends Logging {
private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
private var leaderReplicas = new ListBuffer[Partition]()
private val leaderReplicaLock = new ReentrantLock()
private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true)
private val replicaFetcherManager = new ReplicaFetcherManager(config, this)
private val highwaterMarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
info("Created highwatermark file %s on broker %d".format(highwaterMarkCheckpoint.name, config.brokerId))
// start ISR expiration thread
isrExpirationScheduler.startUp
isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.replicaMaxLagTimeMs)
def startup() {
// start the highwatermark checkpoint thread
kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0,
config.defaultFlushIntervalMs)
// start ISR expiration thread
kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
}
def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
val localReplica = new Replica(config.brokerId, partition, topic, Some(log))
val localReplica = new Replica(config.brokerId, partition, topic, time,
Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
val replicaOpt = partition.getReplica(config.brokerId)
replicaOpt match {
@ -81,12 +88,12 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
case Some(partition) => partition
case None =>
throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
.format(topic, partitionId, config.brokerId))
.format(topic, partitionId, config.brokerId))
}
}
def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = {
val remoteReplica = new Replica(replicaId, partition, topic)
val remoteReplica = new Replica(replicaId, partition, topic, time)
val replicaAdded = partition.addReplica(remoteReplica)
if(replicaAdded)
@ -112,20 +119,17 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
Some(replicas.leaderReplica())
case None =>
throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " +
"%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
"%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
}
}
def getPartition(topic: String, partitionId: Int): Option[Partition] =
allReplicas.get((topic, partitionId))
def updateReplicaLEO(replica: Replica, fetchOffset: Long) {
private def updateReplicaLeo(replica: Replica, fetchOffset: Long) {
// set the replica leo
val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
partition.updateReplicaLEO(replica, fetchOffset)
partition.updateReplicaLeo(replica, fetchOffset)
}
def maybeIncrementLeaderHW(replica: Replica) {
private def maybeIncrementLeaderHW(replica: Replica) {
// set the replica leo
val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
// set the leader HW to min of the leo of all replicas
@ -141,58 +145,70 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
}
def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
// read and cache the ISR
replica.partition.leaderId(Some(replica.brokerId))
replica.partition.updateISR(currentISRInZk.toSet)
// stop replica fetcher thread, if any
replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// also add this partition to the list of partitions for which the leader is the current broker
info("Broker %d started the leader state transition for topic %s partition %d"
.format(config.brokerId, replica.topic, replica.partition.partitionId))
try {
// read and cache the ISR
replica.partition.leaderId(Some(replica.brokerId))
replica.partition.updateISR(currentISRInZk.toSet)
// stop replica fetcher thread, if any
replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// also add this partition to the list of partitions for which the leader is the current broker
leaderReplicaLock.lock()
leaderReplicas += replica.partition
info("Broker %d completed the leader state transition for topic %s partition %d"
.format(config.brokerId, replica.topic, replica.partition.partitionId))
}catch {
case e => error("Broker %d failed to complete the leader state transition for topic %s partition %d"
.format(config.brokerId, replica.topic, replica.partition.partitionId), e)
}finally {
leaderReplicaLock.unlock()
}
}
def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
info("broker %d intending to follow leader %d for topic %s partition %d"
info("Broker %d starting the follower state transition to follow leader %d for topic %s partition %d"
.format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
// set the leader for this partition correctly on this broker
replica.partition.leaderId(Some(leaderBrokerId))
// remove this replica's partition from the ISR expiration queue
try {
// set the leader for this partition correctly on this broker
replica.partition.leaderId(Some(leaderBrokerId))
replica.log match {
case Some(log) => // log is already started
log.truncateTo(replica.highWatermark())
case None =>
}
// get leader for this replica
val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
// become follower only if it is not already following the same leader
if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
info("broker %d becoming follower to leader %d for topic %s partition %d"
.format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
// stop fetcher thread to previous leader
replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// start fetcher thread to current leader
replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker)
}
// remove this replica's partition from the ISR expiration queue
leaderReplicaLock.lock()
leaderReplicas -= replica.partition
info("Broker %d completed the follower state transition to follow leader %d for topic %s partition %d"
.format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
}catch {
case e => error("Broker %d failed to complete the follower state transition to follow leader %d for topic %s partition %d"
.format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId), e)
}finally {
leaderReplicaLock.unlock()
}
replica.log match {
case Some(log) => // log is already started
log.recoverUptoLastCheckpointedHW()
case None =>
}
// get leader for this replica
val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
// Become follower only if it is not already following the same leader
if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
info("broker %d becoming follower to leader %d for topic %s partition %d"
.format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
// stop fetcher thread to previous leader
replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// start fetcher thread to current leader
replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker)
}
}
def maybeShrinkISR(): Unit = {
private def maybeShrinkISR(): Unit = {
try {
info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
.format(config.replicaMaxLagTimeMs))
leaderReplicaLock.lock()
leaderReplicas.foreach { partition =>
// shrink ISR if a follower is slow or stuck
// shrink ISR if a follower is slow or stuck
val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
@ -210,7 +226,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
}
}
def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
private def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
if(partition.inSyncReplicas.contains(replica)) false
else if(partition.assignedReplicas().contains(replica)) {
@ -225,7 +241,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
val replicaOpt = getReplica(topic, partition, replicaId)
replicaOpt match {
case Some(replica) =>
updateReplicaLEO(replica, offset)
updateReplicaLeo(replica, offset)
// check if this replica needs to be added to the ISR
if(checkIfISRCanBeExpanded(replica)) {
val newISR = replica.partition.inSyncReplicas + replica
@ -239,20 +255,45 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
}
}
def recordLeaderLogUpdate(topic: String, partition: Int) = {
def recordLeaderLogEndOffset(topic: String, partition: Int, logEndOffset: Long) = {
val replicaOpt = getReplica(topic, partition, config.brokerId)
replicaOpt match {
case Some(replica) =>
replica.logEndOffsetUpdateTime(Some(time.milliseconds))
case Some(replica) => replica.logEndOffset(Some(logEndOffset))
case None =>
throw new KafkaException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
}
}
def close() {
info("Closing replica manager on broker " + config.brokerId)
isrExpirationScheduler.shutdown()
/**
* Flushes the highwatermark value for all partitions to the highwatermark file
*/
private def checkpointHighwaterMarks() {
val highwaterMarksForAllPartitions = allReplicas.map { partition =>
val topic = partition._1._1
val partitionId = partition._1._2
val localReplicaOpt = partition._2.getReplica(config.brokerId)
val hw = localReplicaOpt match {
case Some(localReplica) => localReplica.highWatermark()
case None =>
error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) +
" Replica metadata doesn't exist in replica manager on broker " + config.brokerId)
0L
}
(topic, partitionId) -> hw
}.toMap
highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions)
info("Checkpointed highwatermarks")
}
/**
* Reads the checkpointed highWatermarks for all partitions
* @returns checkpointed value of highwatermark for topic, partition. If one doesn't exist, returns 0
*/
def readCheckpointedHighWatermark(topic: String, partition: Int): Long = highwaterMarkCheckpoint.read(topic, partition)
def shutdown() {
replicaFetcherManager.shutdown()
checkpointHighwaterMarks()
info("Replica manager shutdown on broker " + config.brokerId)
}
}

View File

@ -17,7 +17,6 @@
package kafka.utils
import kafka.common.KafkaException
import java.lang.IllegalStateException
class State

View File

@ -18,20 +18,24 @@
package kafka.utils
import java.util.concurrent._
import java.util.concurrent.atomic._
import atomic._
import collection.mutable.HashMap
/**
* A scheduler for running jobs in the background
*/
class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) extends Logging {
private val threadId = new AtomicLong(0)
class KafkaScheduler(val numThreads: Int) extends Logging {
private var executor:ScheduledThreadPoolExecutor = null
startUp
private val daemonThreadFactory = new ThreadFactory() {
def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, true)
}
private val nonDaemonThreadFactory = new ThreadFactory() {
def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, false)
}
private val threadNamesAndIds = new HashMap[String, AtomicInteger]()
def startUp = {
executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
def newThread(runnable: Runnable): Thread = Utils.daemonThread(baseThreadName + threadId.getAndIncrement, runnable)
})
executor = new ScheduledThreadPoolExecutor(numThreads)
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
}
@ -43,20 +47,26 @@ class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon:
throw new IllegalStateException("Kafka scheduler has not been started")
}
def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) = {
def scheduleWithRate(fun: () => Unit, name: String, delayMs: Long, periodMs: Long, isDaemon: Boolean = true) = {
ensureExecutorHasStarted
executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
if(isDaemon)
executor.setThreadFactory(daemonThreadFactory)
else
executor.setThreadFactory(nonDaemonThreadFactory)
val threadId = threadNamesAndIds.getOrElseUpdate(name, new AtomicInteger(0))
executor.scheduleAtFixedRate(Utils.loggedRunnable(fun, name + threadId.incrementAndGet()), delayMs, periodMs,
TimeUnit.MILLISECONDS)
}
def shutdownNow() {
ensureExecutorHasStarted
executor.shutdownNow()
info("Forcing shutdown of scheduler " + baseThreadName)
info("Forcing shutdown of Kafka scheduler")
}
def shutdown() {
ensureExecutorHasStarted
executor.shutdown()
info("Shutdown scheduler " + baseThreadName)
info("Shutdown Kafka scheduler")
}
}

View File

@ -58,9 +58,10 @@ object Utils extends Logging {
* @param fun A function
* @return A Runnable that just executes the function
*/
def loggedRunnable(fun: () => Unit): Runnable =
def loggedRunnable(fun: () => Unit, name: String): Runnable =
new Runnable() {
def run() = {
Thread.currentThread().setName(name)
try {
fun()
}
@ -72,6 +73,14 @@ object Utils extends Logging {
}
}
/**
* Create a daemon thread
* @param runnable The runnable to execute in the background
* @return The unstarted thread
*/
def daemonThread(runnable: Runnable): Thread =
newThread(runnable, true)
/**
* Create a daemon thread
* @param name The name of the thread
@ -108,6 +117,23 @@ object Utils extends Logging {
thread
}
/**
* Create a new thread
* @param runnable The work for the thread to do
* @param daemon Should the thread block JVM shutdown?
* @return The unstarted thread
*/
def newThread(runnable: Runnable, daemon: Boolean): Thread = {
val thread = new Thread(runnable)
thread.setDaemon(daemon)
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
def uncaughtException(t: Thread, e: Throwable) {
error("Uncaught exception in thread '" + t.getName + "':", e)
}
})
thread
}
/**
* Read a byte array from the given offset and size in the buffer
* TODO: Should use System.arraycopy

View File

@ -37,6 +37,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
val zookeeperConnect = TestZKUtils.zookeeperConnect
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
val scheduler = new KafkaScheduler(2)
override def setUp() {
super.setUp()
@ -45,7 +46,8 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
override val logFileSize = 1024
override val flushInterval = 100
}
logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
scheduler.startUp
logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false)
logManager.startup
logDir = logManager.logDir
@ -56,8 +58,9 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
}
override def tearDown() {
scheduler.shutdown()
if(logManager != null)
logManager.close()
logManager.shutdown()
Utils.rm(logDir)
super.tearDown()
}
@ -114,7 +117,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
val retentionHours = 1
val retentionMs = 1000 * 60 * 60 * retentionHours
val props = TestUtils.createBrokerConfig(0, -1)
logManager.close
logManager.shutdown()
Thread.sleep(100)
config = new KafkaConfig(props) {
override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
@ -122,7 +125,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
override val logRetentionHours = retentionHours
override val flushInterval = 100
}
logManager = new LogManager(config, time, veryLargeLogFlushInterval, retentionMs, false)
logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, retentionMs, false)
logManager.startup
// create a log
@ -159,7 +162,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
@Test
def testTimeBasedFlush() {
val props = TestUtils.createBrokerConfig(0, -1)
logManager.close
logManager.shutdown()
Thread.sleep(100)
config = new KafkaConfig(props) {
override val logFileSize = 1024 *1024 *1024
@ -167,7 +170,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
override val flushInterval = Int.MaxValue
override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
}
logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false)
logManager.startup
val log = logManager.getOrCreateLog(name, 0)
for(i <- 0 until 200) {
@ -182,15 +185,14 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
@Test
def testConfigurablePartitions() {
val props = TestUtils.createBrokerConfig(0, -1)
logManager.close
logManager.shutdown()
Thread.sleep(100)
config = new KafkaConfig(props) {
override val logFileSize = 256
override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
override val flushInterval = 100
}
logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false)
logManager.startup
for(i <- 0 until 1) {

View File

@ -13,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
*/
package kafka.producer
@ -35,7 +35,7 @@ import java.util
class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
private val brokerId1 = 0
private val brokerId2 = 1
private val brokerId2 = 1
private val ports = TestUtils.choosePorts(2)
private val (port1, port2) = (ports(0), ports(1))
private var server1: KafkaServer = null
@ -80,7 +80,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
server2.shutdown
server2.awaitShutdown()
Utils.rm(server1.config.logDir)
Utils.rm(server2.config.logDir)
Utils.rm(server2.config.logDir)
Thread.sleep(500)
super.tearDown()
}
@ -97,7 +97,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val props2 = new util.Properties()
props2.putAll(props1)
props2.put("producer.request.required.acks", "3")
props2.put("producer.request.ack.timeout.ms", "1000")
props2.put("producer.request.timeout.ms", "1000")
val config1 = new ProducerConfig(props1)
val config2 = new ProducerConfig(props2)
@ -108,33 +108,28 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val producer1 = new Producer[String, String](config1)
val producer2 = new Producer[String, String](config2)
try {
// Available partition ids should be 0.
producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
// get the leader
val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
val leader = leaderOpt.get
// Available partition ids should be 0.
producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
// get the leader
val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
val leader = leaderOpt.get
val messageSet = if(leader == server1.config.brokerId) {
val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
response1.messageSet("new-topic", 0).iterator
}else {
val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
response2.messageSet("new-topic", 0).iterator
}
assertTrue("Message set should have 1 message", messageSet.hasNext)
assertEquals(new Message("test1".getBytes), messageSet.next.message)
assertTrue("Message set should have 1 message", messageSet.hasNext)
assertEquals(new Message("test1".getBytes), messageSet.next.message)
assertFalse("Message set should not have any more messages", messageSet.hasNext)
} catch {
case e: Exception => fail("Not expected", e)
} finally {
producer1.close()
val messageSet = if(leader == server1.config.brokerId) {
val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
response1.messageSet("new-topic", 0).iterator
}else {
val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
response2.messageSet("new-topic", 0).iterator
}
assertTrue("Message set should have 1 message", messageSet.hasNext)
assertEquals(new Message("test1".getBytes), messageSet.next.message)
assertTrue("Message set should have 1 message", messageSet.hasNext)
assertEquals(new Message("test1".getBytes), messageSet.next.message)
assertFalse("Message set should not have any more messages", messageSet.hasNext)
producer1.close()
try {
producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test2")))

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.log.Log
import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
import org.easymock.EasyMock
import org.junit.Assert._
import kafka.utils.{KafkaScheduler, TestUtils, MockTime}
import kafka.common.KafkaException
class HighwatermarkPersistenceTest extends JUnit3Suite {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
override val defaultFlushIntervalMs = 100
})
val topic = "foo"
def testHighWatermarkPersistenceSinglePartition() {
// mock zkclient
val zkClient = EasyMock.createMock(classOf[ZkClient])
EasyMock.replay(zkClient)
// create kafka scheduler
val scheduler = new KafkaScheduler(2)
scheduler.startUp
// create replica manager
val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
replicaManager.startup()
// sleep until flush ms
Thread.sleep(configs.head.defaultFlushIntervalMs)
var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
assertEquals(0L, fooPartition0Hw)
val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
// create leader log
val log0 = getMockLog
// create leader and follower replicas
val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0)
// sleep until flush ms
Thread.sleep(configs.head.defaultFlushIntervalMs)
fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
try {
followerReplicaPartition0.highWatermark()
fail("Should fail with IllegalStateException")
}catch {
case e: KafkaException => // this is ok
}
// set the leader
partition0.leaderId(Some(leaderReplicaPartition0.brokerId))
// set the highwatermark for local replica
partition0.leaderHW(Some(5L))
// sleep until flush interval
Thread.sleep(configs.head.defaultFlushIntervalMs)
fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
EasyMock.verify(zkClient)
EasyMock.verify(log0)
}
def testHighWatermarkPersistenceMultiplePartitions() {
val topic1 = "foo1"
val topic2 = "foo2"
// mock zkclient
val zkClient = EasyMock.createMock(classOf[ZkClient])
EasyMock.replay(zkClient)
// create kafka scheduler
val scheduler = new KafkaScheduler(2)
scheduler.startUp
// create replica manager
val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
replicaManager.startup()
// sleep until flush ms
Thread.sleep(configs.head.defaultFlushIntervalMs)
var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
assertEquals(0L, topic1Partition0Hw)
val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, configs.map(_.brokerId).toSet)
// create leader log
val topic1Log0 = getMockLog
// create leader and follower replicas
val leaderReplicaTopic1Partition0 = replicaManager.addLocalReplica(topic1, 0, topic1Log0, configs.map(_.brokerId).toSet)
// sleep until flush ms
Thread.sleep(configs.head.defaultFlushIntervalMs)
topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
assertEquals(leaderReplicaTopic1Partition0.highWatermark(), topic1Partition0Hw)
// set the leader
topic1Partition0.leaderId(Some(leaderReplicaTopic1Partition0.brokerId))
// set the highwatermark for local replica
topic1Partition0.leaderHW(Some(5L))
// sleep until flush interval
Thread.sleep(configs.head.defaultFlushIntervalMs)
topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark())
assertEquals(5L, topic1Partition0Hw)
// add another partition and set highwatermark
val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, configs.map(_.brokerId).toSet)
// create leader log
val topic2Log0 = getMockLog
// create leader and follower replicas
val leaderReplicaTopic2Partition0 = replicaManager.addLocalReplica(topic2, 0, topic2Log0, configs.map(_.brokerId).toSet)
// sleep until flush ms
Thread.sleep(configs.head.defaultFlushIntervalMs)
var topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
assertEquals(leaderReplicaTopic2Partition0.highWatermark(), topic2Partition0Hw)
// set the leader
topic2Partition0.leaderId(Some(leaderReplicaTopic2Partition0.brokerId))
// set the highwatermark for local replica
topic2Partition0.leaderHW(Some(15L))
assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark())
// change the highwatermark for topic1
topic1Partition0.leaderHW(Some(10L))
assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark())
// sleep until flush interval
Thread.sleep(configs.head.defaultFlushIntervalMs)
// verify checkpointed hw for topic 2
topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
assertEquals(15L, topic2Partition0Hw)
// verify checkpointed hw for topic 1
topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
assertEquals(10L, topic1Partition0Hw)
EasyMock.verify(zkClient)
EasyMock.verify(topic1Log0)
EasyMock.verify(topic2Log0)
}
private def getMockLog: Log = {
val log = EasyMock.createMock(classOf[kafka.log.Log])
EasyMock.replay(log)
log
}
}

View File

@ -22,9 +22,9 @@ import collection.mutable.Map
import kafka.cluster.{Partition, Replica}
import org.easymock.EasyMock
import kafka.log.Log
import kafka.utils.{Time, MockTime, TestUtils}
import org.junit.Assert._
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{KafkaScheduler, Time, MockTime, TestUtils}
class ISRExpirationTest extends JUnit3Suite {
@ -40,7 +40,6 @@ class ISRExpirationTest extends JUnit3Suite {
// create leader replica
val log = EasyMock.createMock(classOf[kafka.log.Log])
EasyMock.expect(log.logEndOffset).andReturn(5L).times(12)
EasyMock.expect(log.setHW(5L)).times(1)
EasyMock.replay(log)
// add one partition
@ -48,10 +47,10 @@ class ISRExpirationTest extends JUnit3Suite {
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
// set remote replicas leo to something low, like 2
(partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 2))
(partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 2))
time.sleep(150)
leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
leaderReplica.logEndOffset(Some(5L))
var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@ -60,9 +59,9 @@ class ISRExpirationTest extends JUnit3Suite {
partition0.inSyncReplicas ++= partition0.assignedReplicas()
assertEquals("Replica 1 should be in sync", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
leaderReplica.logEndOffset(Some(5L))
// let the follower catch up only upto 3
(partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 3))
(partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 3))
time.sleep(150)
// now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't
// pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
@ -80,12 +79,12 @@ class ISRExpirationTest extends JUnit3Suite {
assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
val leaderReplica = partition0.getReplica(configs.head.brokerId).get
// set remote replicas leo to something low, like 4
(partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 4))
(partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 4))
time.sleep(150)
leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
leaderReplica.logEndOffset(Some(15L))
time.sleep(10)
(partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
(partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffset(Some(4)))
val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@ -98,8 +97,10 @@ class ISRExpirationTest extends JUnit3Suite {
// mock zkclient
val zkClient = EasyMock.createMock(classOf[ZkClient])
EasyMock.replay(zkClient)
// create kafka scheduler
val scheduler = new KafkaScheduler(2)
// create replica manager
val replicaManager = new ReplicaManager(configs.head, time, zkClient)
val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler)
try {
val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
// create leader log
@ -115,7 +116,7 @@ class ISRExpirationTest extends JUnit3Suite {
partition0.leaderHW(Some(5L))
// set the leo for non-leader replicas to something low
(partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLEO(r, 2))
(partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLeo(r, 2))
val log1 = getLogWithHW(15L)
// create leader and follower replicas for partition 1
@ -129,13 +130,13 @@ class ISRExpirationTest extends JUnit3Suite {
partition1.leaderHW(Some(15L))
// set the leo for non-leader replicas to something low
(partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLEO(r, 4))
(partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLeo(r, 4))
time.sleep(150)
leaderReplicaPartition0.logEndOffsetUpdateTime(Some(time.milliseconds))
leaderReplicaPartition1.logEndOffsetUpdateTime(Some(time.milliseconds))
leaderReplicaPartition0.logEndOffset(Some(4L))
leaderReplicaPartition1.logEndOffset(Some(4L))
time.sleep(10)
(partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
(partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffset(Some(4L)))
val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@ -148,16 +149,16 @@ class ISRExpirationTest extends JUnit3Suite {
}catch {
case e => e.printStackTrace()
}finally {
replicaManager.close()
replicaManager.shutdown()
}
}
private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
localLog: Log, leaderHW: Long): Partition = {
val partition = new Partition(topic, partitionId, time)
val leaderReplica = new Replica(leaderId, partition, topic, Some(localLog))
val leaderReplica = new Replica(leaderId, partition, topic, time, Some(leaderHW), Some(localLog))
val allReplicas = getFollowerReplicas(partition, leaderId) :+ leaderReplica
val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
partition.assignedReplicas(Some(allReplicas.toSet))
// set in sync replicas for this partition to all the assigned replicas
partition.inSyncReplicas = allReplicas.toSet
@ -170,15 +171,14 @@ class ISRExpirationTest extends JUnit3Suite {
private def getLogWithHW(hw: Long): Log = {
val log1 = EasyMock.createMock(classOf[kafka.log.Log])
EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6)
EasyMock.expect(log1.setHW(hw)).times(1)
EasyMock.replay(log1)
log1
}
private def getFollowerReplicas(partition: Partition, leaderId: Int): Seq[Replica] = {
private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
configs.filter(_.brokerId != leaderId).map { config =>
new Replica(config.brokerId, partition, topic)
new Replica(config.brokerId, partition, topic, time)
}
}
}

View File

@ -7,7 +7,6 @@ import kafka.utils.TestUtils._
import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import kafka.message.Message
import java.io.RandomAccessFile
import kafka.producer.{ProducerConfig, ProducerData, Producer}
import org.junit.Test
@ -34,15 +33,12 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
val configProps1 = configs.head
val configProps2 = configs.last
val server1HWFile = configProps1.logDir + "/" + topic + "-0/highwatermark"
val server2HWFile = configProps2.logDir + "/" + topic + "-0/highwatermark"
val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
var producer: Producer[Int, Message] = null
var hwFile1: RandomAccessFile = null
var hwFile2: RandomAccessFile = null
var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDir)
var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir)
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
@Test
@ -66,22 +62,15 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
sendMessages()
hwFile1 = new RandomAccessFile(server1HWFile, "r")
hwFile2 = new RandomAccessFile(server2HWFile, "r")
sendMessages()
sendMessages(2)
// don't wait for follower to read the leader's hw
// shutdown the servers to allow the hw to be checkpointed
servers.map(server => server.shutdown())
producer.close()
val leaderHW = readHW(hwFile1)
val leaderHW = hwFile1.read(topic, 0)
assertEquals(60L, leaderHW)
val followerHW = readHW(hwFile2)
val followerHW = hwFile2.read(topic, 0)
assertEquals(30L, followerHW)
hwFile1.close()
hwFile2.close()
servers.map(server => Utils.rm(server.config.logDir))
}
@ -105,16 +94,13 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
hwFile1 = new RandomAccessFile(server1HWFile, "r")
hwFile2 = new RandomAccessFile(server2HWFile, "r")
assertEquals(0L, readHW(hwFile1))
assertEquals(0L, hwFile1.read(topic, 0))
sendMessages()
// kill the server hosting the preferred replica
server1.shutdown()
assertEquals(30L, readHW(hwFile1))
assertEquals(30L, hwFile1.read(topic, 0))
// check if leader moves to the other server
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
@ -126,10 +112,10 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
assertEquals(30L, readHW(hwFile1))
assertEquals(30L, hwFile1.read(topic, 0))
// since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet
server2.shutdown()
assertEquals(30L, readHW(hwFile2))
assertEquals(30L, hwFile2.read(topic, 0))
server2.startup()
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
@ -137,17 +123,13 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
sendMessages()
// give some time for follower 1 to record leader HW of 60
Thread.sleep(500)
TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 60L, 500)
// shutdown the servers to allow the hw to be checkpointed
servers.map(server => server.shutdown())
Thread.sleep(200)
producer.close()
assert(hwFile1.length() > 0)
assert(hwFile2.length() > 0)
assertEquals(60L, readHW(hwFile1))
assertEquals(60L, readHW(hwFile2))
hwFile1.close()
hwFile2.close()
assertEquals(60L, hwFile1.read(topic, 0))
assertEquals(60L, hwFile2.read(topic, 0))
servers.map(server => Utils.rm(server.config.logDir))
}
@ -160,14 +142,14 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
override val logFileSize = 30
})
val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
// start both servers
server1 = TestUtils.createServer(configs.head)
server2 = TestUtils.createServer(configs.last)
servers ++= List(server1, server2)
hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
@ -181,25 +163,16 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
sendMessages(10)
hwFile1 = new RandomAccessFile(server1HWFile, "r")
hwFile2 = new RandomAccessFile(server2HWFile, "r")
sendMessages(10)
sendMessages(20)
// give some time for follower 1 to record leader HW of 600
Thread.sleep(500)
TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 600L, 500)
// shutdown the servers to allow the hw to be checkpointed
servers.map(server => server.shutdown())
producer.close()
val leaderHW = readHW(hwFile1)
val leaderHW = hwFile1.read(topic, 0)
assertEquals(600L, leaderHW)
val followerHW = readHW(hwFile2)
val followerHW = hwFile2.read(topic, 0)
assertEquals(600L, followerHW)
hwFile1.close()
hwFile2.close()
servers.map(server => Utils.rm(server.config.logDir))
}
@ -208,19 +181,18 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
override val replicaMaxLagTimeMs = 5000L
override val replicaMaxLagBytes = 10L
override val flushInterval = 1000
override val flushSchedulerThreadRate = 10
override val replicaMinBytes = 20
override val logFileSize = 30
})
val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
// start both servers
server1 = TestUtils.createServer(configs.head)
server2 = TestUtils.createServer(configs.last)
servers ++= List(server1, server2)
hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
@ -235,43 +207,36 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
val hwFile1 = new RandomAccessFile(server1HWFile, "r")
val hwFile2 = new RandomAccessFile(server2HWFile, "r")
sendMessages(2)
// allow some time for the follower to get the leader HW
Thread.sleep(1000)
TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000)
// kill the server hosting the preferred replica
server1.shutdown()
server2.shutdown()
assertEquals(60L, readHW(hwFile1))
assertEquals(60L, readHW(hwFile2))
assertEquals(60L, hwFile1.read(topic, 0))
assertEquals(60L, hwFile2.read(topic, 0))
server2.startup()
// check if leader moves to the other server
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
assertEquals(60L, readHW(hwFile1))
assertEquals(60L, hwFile1.read(topic, 0))
// bring the preferred replica back
server1.startup()
assertEquals(60L, readHW(hwFile1))
assertEquals(60L, readHW(hwFile2))
assertEquals(60L, hwFile1.read(topic, 0))
assertEquals(60L, hwFile2.read(topic, 0))
sendMessages(2)
// allow some time for the follower to get the leader HW
Thread.sleep(1000)
TestUtils.waitUntilTrue(() => server1.getReplica(topic, 0).get.highWatermark() == 120L, 1000)
// shutdown the servers to allow the hw to be checkpointed
servers.map(server => server.shutdown())
producer.close()
assert(hwFile1.length() > 0)
assert(hwFile2.length() > 0)
assertEquals(120L, readHW(hwFile1))
assertEquals(120L, readHW(hwFile2))
hwFile1.close()
hwFile2.close()
assertEquals(120L, hwFile1.read(topic, 0))
assertEquals(120L, hwFile2.read(topic, 0))
servers.map(server => Utils.rm(server.config.logDir))
}
@ -280,9 +245,4 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
producer.send(new ProducerData[Int, Message](topic, 0, sent1))
}
}
private def readHW(hwFile: RandomAccessFile): Long = {
hwFile.seek(0)
hwFile.readLong()
}
}