From b49de724f12549354836c34aea6f2e0640994e07 Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Wed, 25 Jul 2012 23:42:07 +0000 Subject: [PATCH] KAFKA-405 Improve high watermark maintenance to store high watermarks for all partitions in a single .highwatermark file; patched by Neha Narkhede; reviewed by Jay Kreps and Jun Rao git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1365841 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/scala/kafka/cluster/Partition.scala | 9 +- .../main/scala/kafka/cluster/Replica.scala | 33 ++-- .../consumer/ZookeeperConsumerConnector.scala | 7 +- core/src/main/scala/kafka/log/Log.scala | 58 ++----- .../src/main/scala/kafka/log/LogManager.scala | 27 ++-- core/src/main/scala/kafka/log/LogStats.scala | 2 +- .../scala/kafka/message/FileMessageSet.scala | 9 +- .../server/HighwaterMarkCheckpoint.scala | 125 +++++++++++++++ .../main/scala/kafka/server/KafkaApis.scala | 2 +- .../main/scala/kafka/server/KafkaServer.scala | 26 ++-- .../scala/kafka/server/KafkaZooKeeper.scala | 2 +- .../scala/kafka/server/ReplicaManager.scala | 147 +++++++++++------- .../scala/kafka/utils/IteratorTemplate.scala | 1 - .../scala/kafka/utils/KafkaScheduler.scala | 32 ++-- core/src/main/scala/kafka/utils/Utils.scala | 28 +++- .../scala/unit/kafka/log/LogManagerTest.scala | 20 +-- .../unit/kafka/producer/ProducerTest.scala | 53 +++---- .../server/HighwatermarkPersistenceTest.scala | 146 +++++++++++++++++ .../unit/kafka/server/ISRExpirationTest.scala | 42 ++--- .../unit/kafka/server/LogRecoveryTest.scala | 104 ++++--------- 20 files changed, 577 insertions(+), 296 deletions(-) create mode 100644 core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala create mode 100644 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 249ccf85bfb..0f5c7430028 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -56,7 +56,9 @@ class Partition(val topic: String, assignedReplicas } - def getReplica(replicaId: Int): Option[Replica] = assignedReplicas().find(_.brokerId == replicaId) + def getReplica(replicaId: Int): Option[Replica] = { + assignedReplicas().find(_.brokerId == replicaId) + } def addReplica(replica: Replica): Boolean = { if(!assignedReplicas.contains(replica)) { @@ -65,8 +67,7 @@ class Partition(val topic: String, }else false } - def updateReplicaLEO(replica: Replica, leo: Long) { - replica.leoUpdateTime = time.milliseconds + def updateReplicaLeo(replica: Replica, leo: Long) { replica.logEndOffset(Some(leo)) debug("Updating the leo to %d for replica %d".format(leo, replica.brokerId)) } @@ -108,7 +109,7 @@ class Partition(val topic: String, val possiblyStuckReplicas = inSyncReplicas.filter(r => r.logEndOffset() < leaderReplica().logEndOffset()) info("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId, possiblyStuckReplicas.map(_.brokerId).mkString(","))) - val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime() < (time.milliseconds - keepInSyncTimeMs)) + val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime < (time.milliseconds - keepInSyncTimeMs)) info("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) val leader = leaderReplica() // Case 2 above diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index e1b64ea31ef..4f07688420f 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -18,43 +18,38 @@ package kafka.cluster import kafka.log.Log -import kafka.utils.Logging +import kafka.utils.{SystemTime, Time, Logging} import kafka.common.KafkaException class Replica(val brokerId: Int, val partition: Partition, val topic: String, - var log: Option[Log] = None, - var leoUpdateTime: Long = -1L) extends Logging { + time: Time = SystemTime, + var hw: Option[Long] = None, + var log: Option[Log] = None) extends Logging { private var logEndOffset: Long = -1L + private var logEndOffsetUpdateTimeMs: Long = -1L def logEndOffset(newLeo: Option[Long] = None): Long = { isLocal match { case true => newLeo match { - case Some(newOffset) => throw new KafkaException("Trying to set the leo %d for local log".format(newOffset)) + case Some(newOffset) => logEndOffsetUpdateTimeMs = time.milliseconds; newOffset case None => log.get.logEndOffset } case false => newLeo match { case Some(newOffset) => logEndOffset = newOffset + logEndOffsetUpdateTimeMs = time.milliseconds + trace("Setting log end offset for replica %d for topic %s partition %d to %d" + .format(brokerId, topic, partition.partitionId, logEndOffset)) logEndOffset case None => logEndOffset } } } - def logEndOffsetUpdateTime(time: Option[Long] = None): Long = { - time match { - case Some(t) => - leoUpdateTime = t - leoUpdateTime - case None => - leoUpdateTime - } - } - def isLocal: Boolean = { log match { case Some(l) => true @@ -62,6 +57,8 @@ class Replica(val brokerId: Int, } } + def logEndOffsetUpdateTime = logEndOffsetUpdateTimeMs + def highWatermark(highwaterMarkOpt: Option[Long] = None): Long = { highwaterMarkOpt match { case Some(highwaterMark) => @@ -69,7 +66,7 @@ class Replica(val brokerId: Int, case true => trace("Setting hw for topic %s partition %d on broker %d to %d".format(topic, partition.partitionId, brokerId, highwaterMark)) - log.get.setHW(highwaterMark) + hw = Some(highwaterMark) highwaterMark case false => throw new KafkaException("Unable to set highwatermark for topic %s ".format(topic) + "partition %d on broker %d, since there is no local log for this partition" @@ -78,7 +75,11 @@ class Replica(val brokerId: Int, case None => isLocal match { case true => - log.get.getHW() + hw match { + case Some(highWatermarkValue) => highWatermarkValue + case None => throw new KafkaException("HighWatermark does not exist for topic %s ".format(topic) + + " partition %d on broker %d but local log exists".format(partition.partitionId, brokerId)) + } case false => throw new KafkaException("Unable to get highwatermark for topic %s ".format(topic) + "partition %d on broker %d, since there is no local log for this partition" .format(partition.partitionId, brokerId)) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index d99c289d6aa..01c730c9b8b 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -93,9 +93,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var fetcher: Option[ConsumerFetcherManager] = None private var zkClient: ZkClient = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] - // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]] - private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false) + private val scheduler = new KafkaScheduler(1) private val messageStreamCreated = new AtomicBoolean(false) private var sessionExpirationListener: ZKSessionExpireListener = null @@ -121,8 +120,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, connectZk() createFetcher() if (config.autoCommit) { + scheduler.startUp info("starting auto committer every " + config.autoCommitIntervalMs + " ms") - scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs) + scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs, + config.autoCommitIntervalMs, false) } def this(config: ConsumerConfig) = this(config, true) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 1b97f83c46c..c8a24df0ca2 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -18,7 +18,7 @@ package kafka.log import kafka.api.OffsetRequest -import java.io.{IOException, RandomAccessFile, File} +import java.io.{IOException, File} import java.util.{Comparator, Collections, ArrayList} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger} import kafka.utils._ @@ -105,10 +105,8 @@ class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long * of data to be deleted, we have to compute the offset relative to start of the log segment * @param offset Absolute offset for this partition */ - def truncateUpto(offset: Long) = { - assert(offset >= start, "Offset %d used for truncating this log segment cannot be smaller than the start offset %d". - format(offset, start)) - messageSet.truncateUpto(offset - start) + def truncateTo(offset: Long) = { + messageSet.truncateTo(offset - start) } } @@ -134,13 +132,6 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In /* The actual segments of the log */ private[log] val segments: SegmentList[LogSegment] = loadSegments() - /* create the leader highwatermark file handle */ - private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + hwFileName, "rw") - info("Created highwatermark file %s for log %s".format(dir.getAbsolutePath + "/" + hwFileName, name)) - - /* If hw file is absent, the hw defaults to 0. If it exists, hw is set to the checkpointed value */ - private var hw: Long = if(hwFile.length() > 0) hwFile.readLong() else { hwFile.writeLong(0); 0 } - private val logStats = new LogStats(this) Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName) @@ -221,8 +212,6 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In info("Closing log segment " + seg.file.getAbsolutePath) seg.messageSet.close() } - checkpointHW() - hwFile.close() } } @@ -363,7 +352,6 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In segments.view.last.messageSet.flush() unflushed.set(0) lastflushedTime.set(System.currentTimeMillis) - checkpointHW() } } @@ -435,47 +423,31 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In total } - def recoverUptoLastCheckpointedHW() { - if(hwFile.length() > 0) { - // read the last checkpointed hw from disk - hwFile.seek(0) - val lastKnownHW = hwFile.readLong() - info("Recovering log %s upto highwatermark %d".format(name, lastKnownHW)) + def truncateTo(targetOffset: Long) { // find the log segment that has this hw val segmentToBeTruncated = segments.view.find(segment => - lastKnownHW >= segment.start && lastKnownHW < segment.absoluteEndOffset) + targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset) segmentToBeTruncated match { case Some(segment) => - segment.truncateUpto(lastKnownHW) - info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw)) + val truncatedSegmentIndex = segments.view.indexOf(segment) + segments.truncLast(truncatedSegmentIndex) + segment.truncateTo(targetOffset) + info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset)) case None => - assert(lastKnownHW <= logEndOffset, + assert(targetOffset <= segments.view.last.absoluteEndOffset, "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s". - format(lastKnownHW, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath)) + format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath)) error("Cannot truncate log to %d since the log start offset is %d and end offset is %d" - .format(lastKnownHW, segments.view.head.start, logEndOffset)) + .format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset)) } - val segmentsToBeDeleted = segments.view.filter(segment => segment.start > lastKnownHW) + val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset) + if(segmentsToBeDeleted.size < segments.view.size) { val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted) if(numSegmentsDeleted != segmentsToBeDeleted.size) error("Failed to delete some segments during log recovery") - }else - info("Unable to recover log upto hw. No previously checkpointed high watermark found for " + name) - } - - def setHW(latestLeaderHW: Long) { - hw = latestLeaderHW - } - - def getHW(): Long = hw - - def checkpointHW() { - hwFile.seek(0) - hwFile.writeLong(hw) - hwFile.getChannel.force(true) - info("Checkpointed highwatermark %d for log %s".format(hw, name)) + } } def topicName():String = { diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 1203adc11e0..90026fb279e 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -30,6 +30,7 @@ import kafka.common.{KafkaException, InvalidTopicException, InvalidPartitionExce */ @threadsafe private[kafka] class LogManager(val config: KafkaConfig, + scheduler: KafkaScheduler, private val time: Time, val logCleanupIntervalMs: Long, val logCleanupDefaultAgeMs: Long, @@ -40,11 +41,9 @@ private[kafka] class LogManager(val config: KafkaConfig, private val maxSize: Long = config.logFileSize private val flushInterval = config.flushInterval private val logCreationLock = new Object - private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false) private val logFlushIntervals = config.flushIntervalMap private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms private val logRetentionSize = config.logRetentionSize - private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false) /* Initialize a log for each subdirectory of the main log directory */ private val logs = new Pool[String, Pool[Int, Log]]() @@ -76,17 +75,13 @@ private[kafka] class LogManager(val config: KafkaConfig, def startup() { /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { - if(scheduler.hasShutdown) { - println("Restarting log cleaner scheduler") - scheduler.startUp - } - info("starting log cleaner every " + logCleanupIntervalMs + " ms") - scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) + info("Starting log cleaner every " + logCleanupIntervalMs + " ms") + scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false) + info("Starting log flusher every " + config.flushSchedulerThreadRate + + " ms with the following overrides " + logFlushIntervals) + scheduler.scheduleWithRate(flushAllLogs, "kafka-logflusher-", + config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false) } - - if(logFlusherScheduler.hasShutdown) logFlusherScheduler.startUp - info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervals) - logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate) } @@ -95,7 +90,7 @@ private[kafka] class LogManager(val config: KafkaConfig, */ private def createLog(topic: String, partition: Int): Log = { if (topic.length <= 0) - throw new InvalidTopicException("topic name can't be emtpy") + throw new InvalidTopicException("Topic name can't be emtpy") if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) { val error = "Wrong partition %d, valid partitions (0, %d)." .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1)) @@ -207,10 +202,8 @@ private[kafka] class LogManager(val config: KafkaConfig, /** * Close all the logs */ - def close() { + def shutdown() { info("Closing log manager") - scheduler.shutdown() - logFlusherScheduler.shutdown() allLogs.foreach(_.close()) } @@ -220,7 +213,7 @@ private[kafka] class LogManager(val config: KafkaConfig, def allLogs() = logs.values.flatMap(_.values) private def flushAllLogs() = { - debug("flushing the high watermark of all logs") + debug("Flushing the high watermark of all logs") for (log <- allLogs) { try{ diff --git a/core/src/main/scala/kafka/log/LogStats.scala b/core/src/main/scala/kafka/log/LogStats.scala index 13b4a8f2a85..599d6259f17 100644 --- a/core/src/main/scala/kafka/log/LogStats.scala +++ b/core/src/main/scala/kafka/log/LogStats.scala @@ -36,7 +36,7 @@ class LogStats(val log: Log) extends LogStatsMBean { def getNumberOfSegments: Int = log.numberOfSegments - def getCurrentOffset: Long = log.getHW() + def getCurrentOffset: Long = log.logEndOffset def getNumAppendedMessages: Long = numCumulatedMessages.get diff --git a/core/src/main/scala/kafka/message/FileMessageSet.scala b/core/src/main/scala/kafka/message/FileMessageSet.scala index 09a0e027ba5..362547c5fd3 100644 --- a/core/src/main/scala/kafka/message/FileMessageSet.scala +++ b/core/src/main/scala/kafka/message/FileMessageSet.scala @@ -196,9 +196,12 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel, len - validUpTo } - def truncateUpto(hw: Long) = { - channel.truncate(hw) - setSize.set(hw) + def truncateTo(targetSize: Long) = { + if(targetSize >= sizeInBytes()) + throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) + + " size of this log segment is only %d bytes".format(sizeInBytes())) + channel.truncate(targetSize) + setSize.set(targetSize) } /** diff --git a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala new file mode 100644 index 00000000000..56db0d5241c --- /dev/null +++ b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.utils.Logging +import java.util.concurrent.locks.ReentrantLock +import java.io._ + +/** + * This class handles the read/write to the highwaterMark checkpoint file. The file stores the high watermark value for + * all topics and partitions that this broker hosts. The format of this file is as follows - + * version + * number of entries + * topic partition highwatermark + */ + +object HighwaterMarkCheckpoint { + val highWatermarkFileName = ".highwatermark" + val currentHighwaterMarkFileVersion = 0 +} + +class HighwaterMarkCheckpoint(val path: String) extends Logging { + /* create the highwatermark file handle for all partitions */ + val name = path + "/" + HighwaterMarkCheckpoint.highWatermarkFileName + private val hwFile = new File(name) + private val hwFileLock = new ReentrantLock() + // recover from previous tmp file, if required + + def write(highwaterMarksPerPartition: Map[(String, Int), Long]) { + hwFileLock.lock() + try { + // write to temp file and then swap with the highwatermark file + val tempHwFile = new File(hwFile + ".tmp") + // it is an error for this file to be present. It could mean that the previous rename operation failed + if(tempHwFile.exists()) { + fatal("Temporary high watermark %s file exists. This could mean that the ".format(tempHwFile.getAbsolutePath) + + "previous high watermark checkpoint operation has failed.") + System.exit(1) + } + val hwFileWriter = new BufferedWriter(new FileWriter(tempHwFile)) + // checkpoint highwatermark for all partitions + // write the current version + hwFileWriter.write(HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion.toString) + hwFileWriter.newLine() + // write the number of entries in the highwatermark file + hwFileWriter.write(highwaterMarksPerPartition.size.toString) + hwFileWriter.newLine() + + highwaterMarksPerPartition.foreach { partitionAndHw => + val topic = partitionAndHw._1._1 + val partitionId = partitionAndHw._1._2 + hwFileWriter.write("%s %s %s".format(topic, partitionId, partitionAndHw._2)) + hwFileWriter.newLine() + } + hwFileWriter.flush() + hwFileWriter.close() + // swap new high watermark file with previous one + hwFile.delete() + if(!tempHwFile.renameTo(hwFile)) { + fatal("Attempt to swap the new high watermark file with the old one failed") + System.exit(1) + } + }finally { + hwFileLock.unlock() + } + } + + def read(topic: String, partition: Int): Long = { + hwFileLock.lock() + try { + hwFile.length() match { + case 0 => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) + + "partition %d. Returning 0 as the highwatermark".format(partition)) + 0L + case _ => + val hwFileReader = new BufferedReader(new FileReader(hwFile)) + val version = hwFileReader.readLine().toShort + version match { + case HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion => + val numberOfHighWatermarks = hwFileReader.readLine().toInt + val partitionHighWatermarks = + for(i <- 0 until numberOfHighWatermarks) yield { + val nextHwEntry = hwFileReader.readLine() + val partitionHwInfo = nextHwEntry.split(" ") + val highwaterMark = partitionHwInfo.last.toLong + val partitionId = partitionHwInfo.takeRight(2).head + // find the index of partition + val partitionIndex = nextHwEntry.indexOf(partitionId) + val topic = nextHwEntry.substring(0, partitionIndex-1) + ((topic, partitionId.toInt) -> highwaterMark) + } + hwFileReader.close() + val hwOpt = partitionHighWatermarks.toMap.get((topic, partition)) + hwOpt match { + case Some(hw) => debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file" + .format(hw, topic, partition)) + hw + case None => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) + + "partition %d. Returning 0 as the highwatermark".format(partition)) + 0L + } + case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version) + System.exit(1) + -1L + } + } + }finally { + hwFileLock.unlock() + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d305dea4ec6..898f408ec39 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -171,7 +171,7 @@ class KafkaApis(val requestChannel: RequestChannel, kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition) val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition) log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) - replicaManager.recordLeaderLogUpdate(topicData.topic, partitionData.partition) + replicaManager.recordLeaderLogEndOffset(topicData.topic, partitionData.partition, log.logEndOffset) offsets(msgIndex) = log.logEndOffset errors(msgIndex) = ErrorMapping.NoError.toShort trace(partitionData.messages.sizeInBytes + " bytes written to logs.") diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 8e91e4e836d..3ae4c38f7cf 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -40,11 +40,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg private val statsMBeanName = "kafka:type=kafka.SocketServerStats" var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null - private var logManager: LogManager = null + var logManager: LogManager = null var kafkaZookeeper: KafkaZooKeeper = null - private var replicaManager: ReplicaManager = null + var replicaManager: ReplicaManager = null private var apis: KafkaApis = null var kafkaController: KafkaController = new KafkaController(config) + val kafkaScheduler = new KafkaScheduler(4) var zkClient: ZkClient = null /** @@ -62,9 +63,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg cleanShutDownFile.delete } /* start client */ - info("connecting to ZK: " + config.zkConnect) + info("Connecting to ZK: " + config.zkConnect) zkClient = KafkaZookeeperClient.getZookeeperClient(config) + /* start scheduler */ + kafkaScheduler.startUp + /* start log manager */ logManager = new LogManager(config, + kafkaScheduler, SystemTime, 1000L * 60 * config.logCleanupIntervalMinutes, 1000L * 60 * 60 * config.logRetentionHours, @@ -80,7 +85,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg kafkaZookeeper = new KafkaZooKeeper(config, zkClient, addReplica, getReplica, makeLeader, makeFollower) - replicaManager = new ReplicaManager(config, time, zkClient) + replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler) apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper) requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads) @@ -90,6 +95,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // starting relevant replicas and leader election for partitions assigned to this broker kafkaZookeeper.startup() + // start the replica manager + replicaManager.startup() + // start the controller kafkaController.startup() info("Server started.") @@ -103,23 +111,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { info("Shutting down Kafka server with id " + config.brokerId) + kafkaScheduler.shutdown() apis.close() if(replicaManager != null) - replicaManager.close() + replicaManager.shutdown() if (socketServer != null) socketServer.shutdown() if(requestHandlerPool != null) requestHandlerPool.shutdown() Utils.unregisterMBean(statsMBeanName) if(logManager != null) - logManager.close() + logManager.shutdown() if(kafkaController != null) kafkaController.shutDown() - - kafkaZookeeper.close - info("Closing zookeeper client...") + kafkaZookeeper.shutdown() zkClient.close() - val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile) debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath()) cleanShutDownFile.createNewFile diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index e7136dee450..84127992ec7 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -104,7 +104,7 @@ class KafkaZooKeeper(config: KafkaConfig, } } - def close() { + def shutdown() { stateChangeHandler.shutdown() } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0abd5bf3320..2f59d8b02a0 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -13,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.server import kafka.log.Log @@ -25,21 +25,28 @@ import java.util.concurrent.locks.ReentrantLock import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging} import kafka.common.{KafkaException, InvalidPartitionException} -class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging { +class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient, kafkaScheduler: KafkaScheduler) + extends Logging { private var allReplicas = new mutable.HashMap[(String, Int), Partition]() private var leaderReplicas = new ListBuffer[Partition]() private val leaderReplicaLock = new ReentrantLock() - private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true) private val replicaFetcherManager = new ReplicaFetcherManager(config, this) + private val highwaterMarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) + info("Created highwatermark file %s on broker %d".format(highwaterMarkCheckpoint.name, config.brokerId)) - // start ISR expiration thread - isrExpirationScheduler.startUp - isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.replicaMaxLagTimeMs) + def startup() { + // start the highwatermark checkpoint thread + kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0, + config.defaultFlushIntervalMs) + // start ISR expiration thread + kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs) + } def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = { val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds) - val localReplica = new Replica(config.brokerId, partition, topic, Some(log)) + val localReplica = new Replica(config.brokerId, partition, topic, time, + Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log)) val replicaOpt = partition.getReplica(config.brokerId) replicaOpt match { @@ -81,12 +88,12 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex case Some(partition) => partition case None => throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d" - .format(topic, partitionId, config.brokerId)) + .format(topic, partitionId, config.brokerId)) } } def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = { - val remoteReplica = new Replica(replicaId, partition, topic) + val remoteReplica = new Replica(replicaId, partition, topic, time) val replicaAdded = partition.addReplica(remoteReplica) if(replicaAdded) @@ -112,20 +119,17 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex Some(replicas.leaderReplica()) case None => throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " + - "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId)) + "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId)) } } - def getPartition(topic: String, partitionId: Int): Option[Partition] = - allReplicas.get((topic, partitionId)) - - def updateReplicaLEO(replica: Replica, fetchOffset: Long) { + private def updateReplicaLeo(replica: Replica, fetchOffset: Long) { // set the replica leo val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId) - partition.updateReplicaLEO(replica, fetchOffset) + partition.updateReplicaLeo(replica, fetchOffset) } - def maybeIncrementLeaderHW(replica: Replica) { + private def maybeIncrementLeaderHW(replica: Replica) { // set the replica leo val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId) // set the leader HW to min of the leo of all replicas @@ -141,58 +145,70 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex } def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) { - // read and cache the ISR - replica.partition.leaderId(Some(replica.brokerId)) - replica.partition.updateISR(currentISRInZk.toSet) - // stop replica fetcher thread, if any - replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) - // also add this partition to the list of partitions for which the leader is the current broker + info("Broker %d started the leader state transition for topic %s partition %d" + .format(config.brokerId, replica.topic, replica.partition.partitionId)) try { + // read and cache the ISR + replica.partition.leaderId(Some(replica.brokerId)) + replica.partition.updateISR(currentISRInZk.toSet) + // stop replica fetcher thread, if any + replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) + // also add this partition to the list of partitions for which the leader is the current broker leaderReplicaLock.lock() leaderReplicas += replica.partition + info("Broker %d completed the leader state transition for topic %s partition %d" + .format(config.brokerId, replica.topic, replica.partition.partitionId)) + }catch { + case e => error("Broker %d failed to complete the leader state transition for topic %s partition %d" + .format(config.brokerId, replica.topic, replica.partition.partitionId), e) }finally { leaderReplicaLock.unlock() } } def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) { - info("broker %d intending to follow leader %d for topic %s partition %d" + info("Broker %d starting the follower state transition to follow leader %d for topic %s partition %d" .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId)) - // set the leader for this partition correctly on this broker - replica.partition.leaderId(Some(leaderBrokerId)) - // remove this replica's partition from the ISR expiration queue try { + // set the leader for this partition correctly on this broker + replica.partition.leaderId(Some(leaderBrokerId)) + replica.log match { + case Some(log) => // log is already started + log.truncateTo(replica.highWatermark()) + case None => + } + // get leader for this replica + val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head + val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId) + // become follower only if it is not already following the same leader + if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) { + info("broker %d becoming follower to leader %d for topic %s partition %d" + .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId)) + // stop fetcher thread to previous leader + replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) + // start fetcher thread to current leader + replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker) + } + // remove this replica's partition from the ISR expiration queue leaderReplicaLock.lock() leaderReplicas -= replica.partition + info("Broker %d completed the follower state transition to follow leader %d for topic %s partition %d" + .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId)) + }catch { + case e => error("Broker %d failed to complete the follower state transition to follow leader %d for topic %s partition %d" + .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId), e) }finally { leaderReplicaLock.unlock() } - replica.log match { - case Some(log) => // log is already started - log.recoverUptoLastCheckpointedHW() - case None => - } - // get leader for this replica - val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head - val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId) - // Become follower only if it is not already following the same leader - if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) { - info("broker %d becoming follower to leader %d for topic %s partition %d" - .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId)) - // stop fetcher thread to previous leader - replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) - // start fetcher thread to current leader - replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker) - } } - def maybeShrinkISR(): Unit = { + private def maybeShrinkISR(): Unit = { try { info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR" .format(config.replicaMaxLagTimeMs)) leaderReplicaLock.lock() leaderReplicas.foreach { partition => - // shrink ISR if a follower is slow or stuck + // shrink ISR if a follower is slow or stuck val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes) if(outOfSyncReplicas.size > 0) { val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas @@ -210,7 +226,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex } } - def checkIfISRCanBeExpanded(replica: Replica): Boolean = { + private def checkIfISRCanBeExpanded(replica: Replica): Boolean = { val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId) if(partition.inSyncReplicas.contains(replica)) false else if(partition.assignedReplicas().contains(replica)) { @@ -225,7 +241,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex val replicaOpt = getReplica(topic, partition, replicaId) replicaOpt match { case Some(replica) => - updateReplicaLEO(replica, offset) + updateReplicaLeo(replica, offset) // check if this replica needs to be added to the ISR if(checkIfISRCanBeExpanded(replica)) { val newISR = replica.partition.inSyncReplicas + replica @@ -239,20 +255,45 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex } } - def recordLeaderLogUpdate(topic: String, partition: Int) = { + def recordLeaderLogEndOffset(topic: String, partition: Int, logEndOffset: Long) = { val replicaOpt = getReplica(topic, partition, config.brokerId) replicaOpt match { - case Some(replica) => - replica.logEndOffsetUpdateTime(Some(time.milliseconds)) + case Some(replica) => replica.logEndOffset(Some(logEndOffset)) case None => throw new KafkaException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId)) } } - def close() { - info("Closing replica manager on broker " + config.brokerId) - isrExpirationScheduler.shutdown() + /** + * Flushes the highwatermark value for all partitions to the highwatermark file + */ + private def checkpointHighwaterMarks() { + val highwaterMarksForAllPartitions = allReplicas.map { partition => + val topic = partition._1._1 + val partitionId = partition._1._2 + val localReplicaOpt = partition._2.getReplica(config.brokerId) + val hw = localReplicaOpt match { + case Some(localReplica) => localReplica.highWatermark() + case None => + error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) + + " Replica metadata doesn't exist in replica manager on broker " + config.brokerId) + 0L + } + (topic, partitionId) -> hw + }.toMap + highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions) + info("Checkpointed highwatermarks") + } + + /** + * Reads the checkpointed highWatermarks for all partitions + * @returns checkpointed value of highwatermark for topic, partition. If one doesn't exist, returns 0 + */ + def readCheckpointedHighWatermark(topic: String, partition: Int): Long = highwaterMarkCheckpoint.read(topic, partition) + + def shutdown() { replicaFetcherManager.shutdown() + checkpointHighwaterMarks() info("Replica manager shutdown on broker " + config.brokerId) } } diff --git a/core/src/main/scala/kafka/utils/IteratorTemplate.scala b/core/src/main/scala/kafka/utils/IteratorTemplate.scala index ae7878a10ae..301f9346aff 100644 --- a/core/src/main/scala/kafka/utils/IteratorTemplate.scala +++ b/core/src/main/scala/kafka/utils/IteratorTemplate.scala @@ -17,7 +17,6 @@ package kafka.utils -import kafka.common.KafkaException import java.lang.IllegalStateException class State diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 4a18cfdfba3..e3d915b3b48 100644 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -18,20 +18,24 @@ package kafka.utils import java.util.concurrent._ -import java.util.concurrent.atomic._ +import atomic._ +import collection.mutable.HashMap /** * A scheduler for running jobs in the background */ -class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) extends Logging { - private val threadId = new AtomicLong(0) +class KafkaScheduler(val numThreads: Int) extends Logging { private var executor:ScheduledThreadPoolExecutor = null - startUp + private val daemonThreadFactory = new ThreadFactory() { + def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, true) + } + private val nonDaemonThreadFactory = new ThreadFactory() { + def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, false) + } + private val threadNamesAndIds = new HashMap[String, AtomicInteger]() def startUp = { - executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() { - def newThread(runnable: Runnable): Thread = Utils.daemonThread(baseThreadName + threadId.getAndIncrement, runnable) - }) + executor = new ScheduledThreadPoolExecutor(numThreads) executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) } @@ -43,20 +47,26 @@ class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: throw new IllegalStateException("Kafka scheduler has not been started") } - def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) = { + def scheduleWithRate(fun: () => Unit, name: String, delayMs: Long, periodMs: Long, isDaemon: Boolean = true) = { ensureExecutorHasStarted - executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS) + if(isDaemon) + executor.setThreadFactory(daemonThreadFactory) + else + executor.setThreadFactory(nonDaemonThreadFactory) + val threadId = threadNamesAndIds.getOrElseUpdate(name, new AtomicInteger(0)) + executor.scheduleAtFixedRate(Utils.loggedRunnable(fun, name + threadId.incrementAndGet()), delayMs, periodMs, + TimeUnit.MILLISECONDS) } def shutdownNow() { ensureExecutorHasStarted executor.shutdownNow() - info("Forcing shutdown of scheduler " + baseThreadName) + info("Forcing shutdown of Kafka scheduler") } def shutdown() { ensureExecutorHasStarted executor.shutdown() - info("Shutdown scheduler " + baseThreadName) + info("Shutdown Kafka scheduler") } } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 24dfe162af7..bdbc6b18e5b 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -58,9 +58,10 @@ object Utils extends Logging { * @param fun A function * @return A Runnable that just executes the function */ - def loggedRunnable(fun: () => Unit): Runnable = + def loggedRunnable(fun: () => Unit, name: String): Runnable = new Runnable() { def run() = { + Thread.currentThread().setName(name) try { fun() } @@ -72,6 +73,14 @@ object Utils extends Logging { } } + /** + * Create a daemon thread + * @param runnable The runnable to execute in the background + * @return The unstarted thread + */ + def daemonThread(runnable: Runnable): Thread = + newThread(runnable, true) + /** * Create a daemon thread * @param name The name of the thread @@ -108,6 +117,23 @@ object Utils extends Logging { thread } + /** + * Create a new thread + * @param runnable The work for the thread to do + * @param daemon Should the thread block JVM shutdown? + * @return The unstarted thread + */ + def newThread(runnable: Runnable, daemon: Boolean): Thread = { + val thread = new Thread(runnable) + thread.setDaemon(daemon) + thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + def uncaughtException(t: Thread, e: Throwable) { + error("Uncaught exception in thread '" + t.getName + "':", e) + } + }) + thread + } + /** * Read a byte array from the given offset and size in the buffer * TODO: Should use System.arraycopy diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index acc3fd84778..c5bc471400f 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -37,6 +37,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { val zookeeperConnect = TestZKUtils.zookeeperConnect val name = "kafka" val veryLargeLogFlushInterval = 10000000L + val scheduler = new KafkaScheduler(2) override def setUp() { super.setUp() @@ -45,7 +46,8 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { override val logFileSize = 1024 override val flushInterval = 100 } - logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false) + scheduler.startUp + logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false) logManager.startup logDir = logManager.logDir @@ -56,8 +58,9 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { + scheduler.shutdown() if(logManager != null) - logManager.close() + logManager.shutdown() Utils.rm(logDir) super.tearDown() } @@ -114,7 +117,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { val retentionHours = 1 val retentionMs = 1000 * 60 * 60 * retentionHours val props = TestUtils.createBrokerConfig(0, -1) - logManager.close + logManager.shutdown() Thread.sleep(100) config = new KafkaConfig(props) { override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages @@ -122,7 +125,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { override val logRetentionHours = retentionHours override val flushInterval = 100 } - logManager = new LogManager(config, time, veryLargeLogFlushInterval, retentionMs, false) + logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, retentionMs, false) logManager.startup // create a log @@ -159,7 +162,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testTimeBasedFlush() { val props = TestUtils.createBrokerConfig(0, -1) - logManager.close + logManager.shutdown() Thread.sleep(100) config = new KafkaConfig(props) { override val logFileSize = 1024 *1024 *1024 @@ -167,7 +170,7 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { override val flushInterval = Int.MaxValue override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100") } - logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false) + logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false) logManager.startup val log = logManager.getOrCreateLog(name, 0) for(i <- 0 until 200) { @@ -182,15 +185,14 @@ class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testConfigurablePartitions() { val props = TestUtils.createBrokerConfig(0, -1) - logManager.close + logManager.shutdown() Thread.sleep(100) config = new KafkaConfig(props) { override val logFileSize = 256 override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2") override val flushInterval = 100 } - - logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false) + logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false) logManager.startup for(i <- 0 until 1) { diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 44827ca6bf1..c0e0edadb7a 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -13,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.producer @@ -35,7 +35,7 @@ import java.util class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { private val brokerId1 = 0 - private val brokerId2 = 1 + private val brokerId2 = 1 private val ports = TestUtils.choosePorts(2) private val (port1, port2) = (ports(0), ports(1)) private var server1: KafkaServer = null @@ -80,7 +80,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { server2.shutdown server2.awaitShutdown() Utils.rm(server1.config.logDir) - Utils.rm(server2.config.logDir) + Utils.rm(server2.config.logDir) Thread.sleep(500) super.tearDown() } @@ -97,7 +97,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { val props2 = new util.Properties() props2.putAll(props1) props2.put("producer.request.required.acks", "3") - props2.put("producer.request.ack.timeout.ms", "1000") + props2.put("producer.request.timeout.ms", "1000") val config1 = new ProducerConfig(props1) val config2 = new ProducerConfig(props2) @@ -108,33 +108,28 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness { val producer1 = new Producer[String, String](config1) val producer2 = new Producer[String, String](config2) - try { - // Available partition ids should be 0. - producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) - producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) - // get the leader - val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0) - assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined) - val leader = leaderOpt.get + // Available partition ids should be 0. + producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) + producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) + // get the leader + val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0) + assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined) + val leader = leaderOpt.get - val messageSet = if(leader == server1.config.brokerId) { - val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) - response1.messageSet("new-topic", 0).iterator - }else { - val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) - response2.messageSet("new-topic", 0).iterator - } - assertTrue("Message set should have 1 message", messageSet.hasNext) - - assertEquals(new Message("test1".getBytes), messageSet.next.message) - assertTrue("Message set should have 1 message", messageSet.hasNext) - assertEquals(new Message("test1".getBytes), messageSet.next.message) - assertFalse("Message set should not have any more messages", messageSet.hasNext) - } catch { - case e: Exception => fail("Not expected", e) - } finally { - producer1.close() + val messageSet = if(leader == server1.config.brokerId) { + val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + response1.messageSet("new-topic", 0).iterator + }else { + val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + response2.messageSet("new-topic", 0).iterator } + assertTrue("Message set should have 1 message", messageSet.hasNext) + + assertEquals(new Message("test1".getBytes), messageSet.next.message) + assertTrue("Message set should have 1 message", messageSet.hasNext) + assertEquals(new Message("test1".getBytes), messageSet.next.message) + assertFalse("Message set should not have any more messages", messageSet.hasNext) + producer1.close() try { producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test2"))) diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala new file mode 100644 index 00000000000..782c2d0385b --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.server + +import kafka.log.Log +import org.I0Itec.zkclient.ZkClient +import org.scalatest.junit.JUnit3Suite +import org.easymock.EasyMock +import org.junit.Assert._ +import kafka.utils.{KafkaScheduler, TestUtils, MockTime} +import kafka.common.KafkaException + +class HighwatermarkPersistenceTest extends JUnit3Suite { + + val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { + override val defaultFlushIntervalMs = 100 + }) + val topic = "foo" + + def testHighWatermarkPersistenceSinglePartition() { + // mock zkclient + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + // create kafka scheduler + val scheduler = new KafkaScheduler(2) + scheduler.startUp + // create replica manager + val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler) + replicaManager.startup() + // sleep until flush ms + Thread.sleep(configs.head.defaultFlushIntervalMs) + var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) + assertEquals(0L, fooPartition0Hw) + val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet) + // create leader log + val log0 = getMockLog + // create leader and follower replicas + val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet) + val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0) + // sleep until flush ms + Thread.sleep(configs.head.defaultFlushIntervalMs) + fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) + assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw) + try { + followerReplicaPartition0.highWatermark() + fail("Should fail with IllegalStateException") + }catch { + case e: KafkaException => // this is ok + } + // set the leader + partition0.leaderId(Some(leaderReplicaPartition0.brokerId)) + // set the highwatermark for local replica + partition0.leaderHW(Some(5L)) + // sleep until flush interval + Thread.sleep(configs.head.defaultFlushIntervalMs) + fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) + assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw) + EasyMock.verify(zkClient) + EasyMock.verify(log0) + } + + def testHighWatermarkPersistenceMultiplePartitions() { + val topic1 = "foo1" + val topic2 = "foo2" + // mock zkclient + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + // create kafka scheduler + val scheduler = new KafkaScheduler(2) + scheduler.startUp + // create replica manager + val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler) + replicaManager.startup() + // sleep until flush ms + Thread.sleep(configs.head.defaultFlushIntervalMs) + var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) + assertEquals(0L, topic1Partition0Hw) + val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, configs.map(_.brokerId).toSet) + // create leader log + val topic1Log0 = getMockLog + // create leader and follower replicas + val leaderReplicaTopic1Partition0 = replicaManager.addLocalReplica(topic1, 0, topic1Log0, configs.map(_.brokerId).toSet) + // sleep until flush ms + Thread.sleep(configs.head.defaultFlushIntervalMs) + topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) + assertEquals(leaderReplicaTopic1Partition0.highWatermark(), topic1Partition0Hw) + // set the leader + topic1Partition0.leaderId(Some(leaderReplicaTopic1Partition0.brokerId)) + // set the highwatermark for local replica + topic1Partition0.leaderHW(Some(5L)) + // sleep until flush interval + Thread.sleep(configs.head.defaultFlushIntervalMs) + topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) + assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark()) + assertEquals(5L, topic1Partition0Hw) + // add another partition and set highwatermark + val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, configs.map(_.brokerId).toSet) + // create leader log + val topic2Log0 = getMockLog + // create leader and follower replicas + val leaderReplicaTopic2Partition0 = replicaManager.addLocalReplica(topic2, 0, topic2Log0, configs.map(_.brokerId).toSet) + // sleep until flush ms + Thread.sleep(configs.head.defaultFlushIntervalMs) + var topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0) + assertEquals(leaderReplicaTopic2Partition0.highWatermark(), topic2Partition0Hw) + // set the leader + topic2Partition0.leaderId(Some(leaderReplicaTopic2Partition0.brokerId)) + // set the highwatermark for local replica + topic2Partition0.leaderHW(Some(15L)) + assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark()) + // change the highwatermark for topic1 + topic1Partition0.leaderHW(Some(10L)) + assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark()) + // sleep until flush interval + Thread.sleep(configs.head.defaultFlushIntervalMs) + // verify checkpointed hw for topic 2 + topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0) + assertEquals(15L, topic2Partition0Hw) + // verify checkpointed hw for topic 1 + topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) + assertEquals(10L, topic1Partition0Hw) + EasyMock.verify(zkClient) + EasyMock.verify(topic1Log0) + EasyMock.verify(topic2Log0) + } + + private def getMockLog: Log = { + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.replay(log) + log + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 4038974cfca..2a58fdf4348 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -22,9 +22,9 @@ import collection.mutable.Map import kafka.cluster.{Partition, Replica} import org.easymock.EasyMock import kafka.log.Log -import kafka.utils.{Time, MockTime, TestUtils} import org.junit.Assert._ import org.I0Itec.zkclient.ZkClient +import kafka.utils.{KafkaScheduler, Time, MockTime, TestUtils} class ISRExpirationTest extends JUnit3Suite { @@ -40,7 +40,6 @@ class ISRExpirationTest extends JUnit3Suite { // create leader replica val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(5L).times(12) - EasyMock.expect(log.setHW(5L)).times(1) EasyMock.replay(log) // add one partition @@ -48,10 +47,10 @@ class ISRExpirationTest extends JUnit3Suite { assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get // set remote replicas leo to something low, like 2 - (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 2)) + (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 2)) time.sleep(150) - leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds)) + leaderReplica.logEndOffset(Some(5L)) var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) @@ -60,9 +59,9 @@ class ISRExpirationTest extends JUnit3Suite { partition0.inSyncReplicas ++= partition0.assignedReplicas() assertEquals("Replica 1 should be in sync", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) - leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds)) + leaderReplica.logEndOffset(Some(5L)) // let the follower catch up only upto 3 - (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 3)) + (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 3)) time.sleep(150) // now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck @@ -80,12 +79,12 @@ class ISRExpirationTest extends JUnit3Suite { assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get // set remote replicas leo to something low, like 4 - (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 4)) + (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 4)) time.sleep(150) - leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds)) + leaderReplica.logEndOffset(Some(15L)) time.sleep(10) - (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds))) + (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffset(Some(4))) val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) @@ -98,8 +97,10 @@ class ISRExpirationTest extends JUnit3Suite { // mock zkclient val zkClient = EasyMock.createMock(classOf[ZkClient]) EasyMock.replay(zkClient) + // create kafka scheduler + val scheduler = new KafkaScheduler(2) // create replica manager - val replicaManager = new ReplicaManager(configs.head, time, zkClient) + val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler) try { val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet) // create leader log @@ -115,7 +116,7 @@ class ISRExpirationTest extends JUnit3Suite { partition0.leaderHW(Some(5L)) // set the leo for non-leader replicas to something low - (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLEO(r, 2)) + (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLeo(r, 2)) val log1 = getLogWithHW(15L) // create leader and follower replicas for partition 1 @@ -129,13 +130,13 @@ class ISRExpirationTest extends JUnit3Suite { partition1.leaderHW(Some(15L)) // set the leo for non-leader replicas to something low - (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLEO(r, 4)) + (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLeo(r, 4)) time.sleep(150) - leaderReplicaPartition0.logEndOffsetUpdateTime(Some(time.milliseconds)) - leaderReplicaPartition1.logEndOffsetUpdateTime(Some(time.milliseconds)) + leaderReplicaPartition0.logEndOffset(Some(4L)) + leaderReplicaPartition1.logEndOffset(Some(4L)) time.sleep(10) - (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds))) + (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffset(Some(4L))) val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) @@ -148,16 +149,16 @@ class ISRExpirationTest extends JUnit3Suite { }catch { case e => e.printStackTrace() }finally { - replicaManager.close() + replicaManager.shutdown() } } private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, localLog: Log, leaderHW: Long): Partition = { val partition = new Partition(topic, partitionId, time) - val leaderReplica = new Replica(leaderId, partition, topic, Some(localLog)) + val leaderReplica = new Replica(leaderId, partition, topic, time, Some(leaderHW), Some(localLog)) - val allReplicas = getFollowerReplicas(partition, leaderId) :+ leaderReplica + val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica partition.assignedReplicas(Some(allReplicas.toSet)) // set in sync replicas for this partition to all the assigned replicas partition.inSyncReplicas = allReplicas.toSet @@ -170,15 +171,14 @@ class ISRExpirationTest extends JUnit3Suite { private def getLogWithHW(hw: Long): Log = { val log1 = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6) - EasyMock.expect(log1.setHW(hw)).times(1) EasyMock.replay(log1) log1 } - private def getFollowerReplicas(partition: Partition, leaderId: Int): Seq[Replica] = { + private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = { configs.filter(_.brokerId != leaderId).map { config => - new Replica(config.brokerId, partition, topic) + new Replica(config.brokerId, partition, topic, time) } } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index a30a4f7e429..6cd1dbe0ab8 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -7,7 +7,6 @@ import kafka.utils.TestUtils._ import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness import kafka.message.Message -import java.io.RandomAccessFile import kafka.producer.{ProducerConfig, ProducerData, Producer} import org.junit.Test @@ -34,15 +33,12 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configProps1 = configs.head val configProps2 = configs.last - val server1HWFile = configProps1.logDir + "/" + topic + "-0/highwatermark" - val server2HWFile = configProps2.logDir + "/" + topic + "-0/highwatermark" - val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes())) val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes())) var producer: Producer[Int, Message] = null - var hwFile1: RandomAccessFile = null - var hwFile2: RandomAccessFile = null + var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDir) + var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] @Test @@ -66,22 +62,15 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // NOTE: this is to avoid transient test failures assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) - sendMessages() - - hwFile1 = new RandomAccessFile(server1HWFile, "r") - hwFile2 = new RandomAccessFile(server2HWFile, "r") - - sendMessages() + sendMessages(2) // don't wait for follower to read the leader's hw // shutdown the servers to allow the hw to be checkpointed servers.map(server => server.shutdown()) producer.close() - val leaderHW = readHW(hwFile1) + val leaderHW = hwFile1.read(topic, 0) assertEquals(60L, leaderHW) - val followerHW = readHW(hwFile2) + val followerHW = hwFile2.read(topic, 0) assertEquals(30L, followerHW) - hwFile1.close() - hwFile2.close() servers.map(server => Utils.rm(server.config.logDir)) } @@ -105,16 +94,13 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // NOTE: this is to avoid transient test failures assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) - hwFile1 = new RandomAccessFile(server1HWFile, "r") - hwFile2 = new RandomAccessFile(server2HWFile, "r") - - assertEquals(0L, readHW(hwFile1)) + assertEquals(0L, hwFile1.read(topic, 0)) sendMessages() // kill the server hosting the preferred replica server1.shutdown() - assertEquals(30L, readHW(hwFile1)) + assertEquals(30L, hwFile1.read(topic, 0)) // check if leader moves to the other server leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) @@ -126,10 +112,10 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1)) - assertEquals(30L, readHW(hwFile1)) + assertEquals(30L, hwFile1.read(topic, 0)) // since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet server2.shutdown() - assertEquals(30L, readHW(hwFile2)) + assertEquals(30L, hwFile2.read(topic, 0)) server2.startup() leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) @@ -137,17 +123,13 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { sendMessages() // give some time for follower 1 to record leader HW of 60 - Thread.sleep(500) + TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 60L, 500) + // shutdown the servers to allow the hw to be checkpointed servers.map(server => server.shutdown()) - Thread.sleep(200) producer.close() - assert(hwFile1.length() > 0) - assert(hwFile2.length() > 0) - assertEquals(60L, readHW(hwFile1)) - assertEquals(60L, readHW(hwFile2)) - hwFile1.close() - hwFile2.close() + assertEquals(60L, hwFile1.read(topic, 0)) + assertEquals(60L, hwFile2.read(topic, 0)) servers.map(server => Utils.rm(server.config.logDir)) } @@ -160,14 +142,14 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { override val logFileSize = 30 }) - val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark" - val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark" - // start both servers server1 = TestUtils.createServer(configs.head) server2 = TestUtils.createServer(configs.last) servers ++= List(server1, server2) + hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir) + hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir) + val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000) producerProps.put("producer.request.timeout.ms", "1000") producerProps.put("producer.request.required.acks", "-1") @@ -181,25 +163,16 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue("Leader should get elected", leader.isDefined) // NOTE: this is to avoid transient test failures assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) - - sendMessages(10) - - hwFile1 = new RandomAccessFile(server1HWFile, "r") - hwFile2 = new RandomAccessFile(server2HWFile, "r") - - sendMessages(10) - + sendMessages(20) // give some time for follower 1 to record leader HW of 600 - Thread.sleep(500) + TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 600L, 500) // shutdown the servers to allow the hw to be checkpointed servers.map(server => server.shutdown()) producer.close() - val leaderHW = readHW(hwFile1) + val leaderHW = hwFile1.read(topic, 0) assertEquals(600L, leaderHW) - val followerHW = readHW(hwFile2) + val followerHW = hwFile2.read(topic, 0) assertEquals(600L, followerHW) - hwFile1.close() - hwFile2.close() servers.map(server => Utils.rm(server.config.logDir)) } @@ -208,19 +181,18 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { override val replicaMaxLagTimeMs = 5000L override val replicaMaxLagBytes = 10L override val flushInterval = 1000 - override val flushSchedulerThreadRate = 10 override val replicaMinBytes = 20 override val logFileSize = 30 }) - val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark" - val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark" - // start both servers server1 = TestUtils.createServer(configs.head) server2 = TestUtils.createServer(configs.last) servers ++= List(server1, server2) + hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir) + hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir) + val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000) producerProps.put("producer.request.timeout.ms", "1000") producerProps.put("producer.request.required.acks", "-1") @@ -235,43 +207,36 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // NOTE: this is to avoid transient test failures assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) - val hwFile1 = new RandomAccessFile(server1HWFile, "r") - val hwFile2 = new RandomAccessFile(server2HWFile, "r") - sendMessages(2) // allow some time for the follower to get the leader HW - Thread.sleep(1000) + TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000) // kill the server hosting the preferred replica server1.shutdown() server2.shutdown() - assertEquals(60L, readHW(hwFile1)) - assertEquals(60L, readHW(hwFile2)) + assertEquals(60L, hwFile1.read(topic, 0)) + assertEquals(60L, hwFile2.read(topic, 0)) server2.startup() // check if leader moves to the other server leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1)) - assertEquals(60L, readHW(hwFile1)) + assertEquals(60L, hwFile1.read(topic, 0)) // bring the preferred replica back server1.startup() - assertEquals(60L, readHW(hwFile1)) - assertEquals(60L, readHW(hwFile2)) + assertEquals(60L, hwFile1.read(topic, 0)) + assertEquals(60L, hwFile2.read(topic, 0)) sendMessages(2) // allow some time for the follower to get the leader HW - Thread.sleep(1000) + TestUtils.waitUntilTrue(() => server1.getReplica(topic, 0).get.highWatermark() == 120L, 1000) // shutdown the servers to allow the hw to be checkpointed servers.map(server => server.shutdown()) producer.close() - assert(hwFile1.length() > 0) - assert(hwFile2.length() > 0) - assertEquals(120L, readHW(hwFile1)) - assertEquals(120L, readHW(hwFile2)) - hwFile1.close() - hwFile2.close() + assertEquals(120L, hwFile1.read(topic, 0)) + assertEquals(120L, hwFile2.read(topic, 0)) servers.map(server => Utils.rm(server.config.logDir)) } @@ -280,9 +245,4 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.send(new ProducerData[Int, Message](topic, 0, sent1)) } } - - private def readHW(hwFile: RandomAccessFile): Long = { - hwFile.seek(0) - hwFile.readLong() - } } \ No newline at end of file