From 4be0b1be29e301500515b6c5e00655b06e829045 Mon Sep 17 00:00:00 2001 From: Edward Jay Kreps Date: Sun, 2 Dec 2012 20:50:01 +0000 Subject: [PATCH] KAFKA-521 Refactor the log subsystem. Patch reviewed by Neha. git-svn-id: https://svn.apache.org/repos/asf/kafka/trunk@1416253 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/scala/kafka/log/FileMessageSet.scala | 112 ++- core/src/main/scala/kafka/log/Log.scala | 649 ++++++++---------- .../src/main/scala/kafka/log/LogManager.scala | 78 +-- .../src/main/scala/kafka/log/LogSegment.scala | 171 +++-- .../main/scala/kafka/log/OffsetIndex.scala | 24 +- core/src/main/scala/kafka/log/package.html | 7 +- .../main/scala/kafka/message/MessageSet.scala | 14 +- .../scala/kafka/producer/ProducerConfig.scala | 12 +- .../main/scala/kafka/server/KafkaApis.scala | 92 ++- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../scala/kafka/server/LeaderElector.scala | 2 - .../kafka/server/ReplicaFetcherThread.scala | 2 +- .../scala/kafka/tools/KafkaMigrationTool.java | 2 +- .../main/scala/kafka/utils/Throttler.scala | 37 +- .../scala/unit/kafka/admin/AdminTest.scala | 2 - .../unit/kafka/log/FileMessageSetTest.scala | 79 ++- .../scala/unit/kafka/log/LogManagerTest.scala | 52 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 107 ++- .../test/scala/unit/kafka/log/LogTest.scala | 318 +++++---- .../message/BaseMessageSetTestCases.scala | 2 +- .../kafka/producer/AsyncProducerTest.scala | 1 + .../unit/kafka/server/SimpleFetchTest.scala | 58 +- .../scala/unit/kafka/utils/TestUtils.scala | 1 - .../unit/kafka/zk/EmbeddedZookeeper.scala | 3 +- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 6 +- .../main/java/kafka/examples/Consumer.java | 1 - 26 files changed, 1046 insertions(+), 788 deletions(-) diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 7e7f344f4f0..52840268bbb 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -29,54 +29,83 @@ import java.util.concurrent.TimeUnit import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} /** - * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts - * will fail on an immutable message set. An optional limit and start position can be applied to the message set - * which will control the position in the file at which the set begins. + * An on-disk message set. An optional start and end position can be applied to the message set + * which will allow slicing a subset of the file. + * @param file The file name for the underlying log data + * @param channel the underlying file channel used + * @param start A lower bound on the absolute position in the file from which the message set begins + * @param end The upper bound on the absolute position in the file at which the message set ends + * @param isSlice Should the start and end parameters be used for slicing? */ @nonthreadsafe class FileMessageSet private[kafka](val file: File, private[log] val channel: FileChannel, - private[log] val start: Int = 0, - private[log] val limit: Int = Int.MaxValue, - initChannelPositionToEnd: Boolean = true) extends MessageSet with Logging { + private[log] val start: Int, + private[log] val end: Int, + isSlice: Boolean) extends MessageSet with Logging { /* the size of the message set in bytes */ - private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start) + private val _size = + if(isSlice) + new AtomicInteger(end - start) // don't check the file size if this is just a slice view + else + new AtomicInteger(math.min(channel.size().toInt, end) - start) - if (initChannelPositionToEnd) { - /* set the file position to the last byte in the file */ + /* if this is not a slice, update the file pointer to the end of the file */ + if (!isSlice) channel.position(channel.size) - } /** - * Create a file message set with no limit or offset + * Create a file message set with no slicing. */ - def this(file: File, channel: FileChannel) = this(file, channel, 0, Int.MaxValue) + def this(file: File, channel: FileChannel) = + this(file, channel, start = 0, end = Int.MaxValue, isSlice = false) /** - * Create a file message set with no limit or offset + * Create a file message set with no slicing */ - def this(file: File) = this(file, Utils.openChannel(file, mutable = true)) + def this(file: File) = + this(file, Utils.openChannel(file, mutable = true)) + + /** + * Create a slice view of the file message set that begins and ends at the given byte offsets + */ + def this(file: File, channel: FileChannel, start: Int, end: Int) = + this(file, channel, start, end, isSlice = true) /** * Return a message set which is a view into this set starting from the given position and with the given size limit. + * + * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read. + * + * If this message set is already sliced, the position will be taken relative to that slicing. + * + * @param position The start position to begin the read from + * @param size The number of bytes after the start position to include + * + * @return A sliced wrapper on this message set limited based on the given position and size */ def read(position: Int, size: Int): FileMessageSet = { + if(position < 0) + throw new IllegalArgumentException("Invalid position: " + position) + if(size < 0) + throw new IllegalArgumentException("Invalid size: " + size) new FileMessageSet(file, channel, - this.start + position, - scala.math.min(this.start + position + size, sizeInBytes()), - false) + start = this.start + position, + end = math.min(this.start + position + size, sizeInBytes())) } /** * Search forward for the file position of the last offset that is great than or equal to the target offset * and return its physical position. If no such offsets are found, return null. + * @param targetOffset The offset to search for. + * @param startingPosition The starting position in the file to begin searching from. */ - private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = { + def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = { var position = startingPosition val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) - val size = _size.get() + val size = sizeInBytes() while(position + MessageSet.LogOverhead < size) { buffer.rewind() channel.read(buffer, position) @@ -94,19 +123,34 @@ class FileMessageSet private[kafka](val file: File, } /** - * Write some of this set to the given channel, return the amount written + * Write some of this set to the given channel. + * @param destChannel The channel to write to. + * @param writePosition The position in the message set to begin writing from. + * @param size The maximum number of bytes to write + * @return The number of bytes actually written. */ def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = - channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt + channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt /** - * Get an iterator over the messages in the set. We only do shallow iteration here. + * Get a shallow iterator over the messages in the set. */ - override def iterator: Iterator[MessageAndOffset] = { + override def iterator() = iterator(Int.MaxValue) + + /** + * Get an iterator over the messages in the set. We only do shallow iteration here. + * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory. + * If we encounter a message larger than this we throw an InvalidMessageException. + * @return The iterator. + */ + def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = { new IteratorTemplate[MessageAndOffset] { var location = start override def makeNext(): MessageAndOffset = { + if(location >= end) + return allDone() + // read the size of the item val sizeOffsetBuffer = ByteBuffer.allocate(12) channel.read(sizeOffsetBuffer, location) @@ -116,8 +160,10 @@ class FileMessageSet private[kafka](val file: File, sizeOffsetBuffer.rewind() val offset = sizeOffsetBuffer.getLong() val size = sizeOffsetBuffer.getInt() - if (size < Message.MinHeaderSize) + if(size < Message.MinHeaderSize) return allDone() + if(size > maxMessageSize) + throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize)) // read the item itself val buffer = ByteBuffer.allocate(size) @@ -139,7 +185,7 @@ class FileMessageSet private[kafka](val file: File, def sizeInBytes(): Int = _size.get() /** - * Append this message to the message set + * Append these messages to the message set */ def append(messages: ByteBufferMessageSet) { val written = messages.writeTo(channel, 0, messages.sizeInBytes) @@ -150,9 +196,7 @@ class FileMessageSet private[kafka](val file: File, * Commit all written data to the physical disk */ def flush() = { - LogFlushStats.logFlushTimer.time { - channel.force(true) - } + channel.force(true) } /** @@ -165,6 +209,7 @@ class FileMessageSet private[kafka](val file: File, /** * Delete this message set from the filesystem + * @return True iff this message set was deleted. */ def delete(): Boolean = { Utils.swallow(channel.close()) @@ -172,13 +217,14 @@ class FileMessageSet private[kafka](val file: File, } /** - * Truncate this file message set to the given size. Note that this API does no checking that the - * given size falls on a valid byte offset. + * Truncate this file message set to the given size in bytes. Note that this API does no checking that the + * given size falls on a valid message boundary. + * @param targetSize The size to truncate to. */ def truncateTo(targetSize: Int) = { - if(targetSize > sizeInBytes()) - throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) + - " size of this log segment is only %d bytes".format(sizeInBytes())) + if(targetSize > sizeInBytes || targetSize < 0) + throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " + + " size of this log segment is " + sizeInBytes + " bytes.") channel.truncate(targetSize) channel.position(targetSize) _size.set(targetSize) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 4cb244550ed..a12833f7fd2 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -17,76 +17,17 @@ package kafka.log -import kafka.api.OffsetRequest import java.io.{IOException, File} -import java.util.{Comparator, Collections, ArrayList} +import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} import java.util.concurrent.atomic._ import kafka.utils._ -import scala.math._ +import scala.collection.JavaConversions.asIterable; import java.text.NumberFormat -import kafka.server.BrokerTopicStat import kafka.message._ import kafka.common._ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -object Log { - val LogFileSuffix = ".log" - val IndexFileSuffix = ".index" - - /** - * Search for the greatest range with start <= the target value. - */ - def findRange[T <: Range](ranges: Array[T], value: Long, arraySize: Int): Option[T] = { - if(ranges.size < 1) - return None - - // check out of bounds - if(value < ranges(0).start) - return None - - var low = 0 - var high = arraySize - 1 - while(low < high) { - val mid = ceil((high + low) / 2.0).toInt - val found = ranges(mid) - if(found.start == value) - return Some(found) - else if (value < found.start) - high = mid - 1 - else - low = mid - } - Some(ranges(low)) - } - - def findRange[T <: Range](ranges: Array[T], value: Long): Option[T] = - findRange(ranges, value, ranges.length) - - /** - * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros - * so that ls sorts the files numerically - */ - def filenamePrefixFromOffset(offset: Long): String = { - val nf = NumberFormat.getInstance() - nf.setMinimumIntegerDigits(20) - nf.setMaximumFractionDigits(0) - nf.setGroupingUsed(false) - nf.format(offset) - } - - def logFilename(dir: File, offset: Long) = - new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix) - - def indexFilename(dir: File, offset: Long) = - new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix) - - def getEmptyOffsets(timestamp: Long): Seq[Long] = - if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) - Seq(0L) - else Nil -} - /** * An append-only log for storing messages. @@ -96,19 +37,26 @@ object Log { * New log segments are created according to a configurable policy that controls the size in bytes or time interval * for a given segment. * + * @param dir The directory in which log segments are created. + * @param maxSegmentSize The maximum segment size in bytes. + * @param maxMessageSize The maximum message size in bytes (including headers) that will be allowed in this log. + * @param flushInterval The number of messages that can be appended to this log before we force a flush of the log. + * @param rollIntervalMs The time after which we will force the rolling of a new log segment + * @param needsRecovery Should we run recovery on this log when opening it? This should be done if the log wasn't cleanly shut down. + * @param maxIndexSize The maximum size of an offset index in this log. The index of the active log segment will be pre-allocated to this size. + * @param indexIntervalBytes The (approximate) number of bytes between entries in the offset index for this log. + * */ @threadsafe -private[kafka] class Log(val dir: File, - val maxLogFileSize: Int, - val maxMessageSize: Int, - val flushInterval: Int = Int.MaxValue, - val rollIntervalMs: Long = Long.MaxValue, - val needsRecovery: Boolean, - val maxIndexSize: Int = (10*1024*1024), - val indexIntervalBytes: Int = 4096, - time: Time = SystemTime, - brokerId: Int = 0) extends Logging with KafkaMetricsGroup { - this.logIdent = "[Kafka Log on Broker " + brokerId + "], " +class Log(val dir: File, + val maxSegmentSize: Int, + val maxMessageSize: Int, + val flushInterval: Int = Int.MaxValue, + val rollIntervalMs: Long = Long.MaxValue, + val needsRecovery: Boolean, + val maxIndexSize: Int = (10*1024*1024), + val indexIntervalBytes: Int = 4096, + time: Time = SystemTime) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ @@ -119,13 +67,13 @@ private[kafka] class Log(val dir: File, private val unflushed = new AtomicInteger(0) /* last time it was flushed */ - private val lastflushedTime = new AtomicLong(System.currentTimeMillis) + private val lastflushedTime = new AtomicLong(time.milliseconds) /* the actual segments of the log */ - private[log] val segments: SegmentList[LogSegment] = loadSegments() + private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments() /* Calculate the offset of the next message */ - private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset()) + private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset()) newGauge(name + "-" + "NumLogSegments", new Gauge[Int] { def getValue = numberOfSegments }) @@ -133,13 +81,13 @@ private[kafka] class Log(val dir: File, newGauge(name + "-" + "LogEndOffset", new Gauge[Long] { def getValue = logEndOffset }) - /* The name of this log */ + /** The name of this log */ def name = dir.getName() /* Load the log segments from the log files on disk */ - private def loadSegments(): SegmentList[LogSegment] = { + private def loadSegments(): ConcurrentNavigableMap[Long, LogSegment] = { // open all the segments read-only - val logSegments = new ArrayList[LogSegment] + val logSegments = new ConcurrentSkipListMap[Long, LogSegment] val ls = dir.listFiles() if(ls != null) { for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix)) { @@ -147,75 +95,46 @@ private[kafka] class Log(val dir: File, throw new IOException("Could not read file " + file) val filename = file.getName() val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong - // TODO: we should ideally rebuild any missing index files, instead of erroring out - if(!Log.indexFilename(dir, start).exists) - throw new IllegalStateException("Found log file with no corresponding index file.") - logSegments.add(new LogSegment(dir = dir, - startOffset = start, - indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize)) + val hasIndex = Log.indexFilename(dir, start).exists + val segment = new LogSegment(dir = dir, + startOffset = start, + indexIntervalBytes = indexIntervalBytes, + maxIndexSize = maxIndexSize) + if(!hasIndex) { + // this can only happen if someone manually deletes the index file + error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) + segment.recover(maxMessageSize) + } + logSegments.put(start, segment) } } if(logSegments.size == 0) { - // no existing segments, create a new mutable segment - logSegments.add(new LogSegment(dir = dir, + // no existing segments, create a new mutable segment beginning at offset 0 + logSegments.put(0, + new LogSegment(dir = dir, startOffset = 0, indexIntervalBytes = indexIntervalBytes, maxIndexSize = maxIndexSize)) } else { - // there is at least one existing segment, validate and recover them/it - // sort segments into ascending order for fast searching - Collections.sort(logSegments, new Comparator[LogSegment] { - def compare(s1: LogSegment, s2: LogSegment): Int = { - if(s1.start == s2.start) 0 - else if(s1.start < s2.start) -1 - else 1 - } - }) + // reset the index size of the currently active log segment to allow more entries + val active = logSegments.lastEntry.getValue + active.index.resize(maxIndexSize) - // reset the index size of the last (current active) log segment to its maximum value - logSegments.get(logSegments.size() - 1).index.resize(maxIndexSize) - - // run recovery on the last segment if necessary - if(needsRecovery) - recoverSegment(logSegments.get(logSegments.size - 1)) - } - new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size))) - } - - /** - * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log. - */ - private def recoverSegment(segment: LogSegment) { - segment.index.truncate() - var validBytes = 0 - var lastIndexEntry = 0 - val iter = segment.messageSet.iterator - try { - while(iter.hasNext) { - val entry = iter.next - entry.message.ensureValid() - if(validBytes - lastIndexEntry > indexIntervalBytes) { - segment.index.append(entry.offset, validBytes) - lastIndexEntry = validBytes - } - validBytes += MessageSet.entrySize(entry.message) + // run recovery on the active segment if necessary + if(needsRecovery) { + info("Recovering active segment of %s.".format(name)) + active.recover(maxMessageSize) } - } catch { - case e: InvalidMessageException => - logger.warn("Found invalid messages in log " + name) } - val truncated = segment.messageSet.sizeInBytes - validBytes - if(truncated > 0) - warn("Truncated " + truncated + " invalid bytes from the log " + name + ".") - segment.messageSet.truncateTo(validBytes) + logSegments } /** - * The number of segments in the log + * The number of segments in the log. + * Take care! this is an O(n) operation. */ - def numberOfSegments: Int = segments.view.length + def numberOfSegments: Int = segments.size /** * Close this log @@ -223,7 +142,7 @@ private[kafka] class Log(val dir: File, def close() { debug("Closing log " + name) lock synchronized { - for(seg <- segments.view) + for(seg <- logSegments) seg.close() } } @@ -234,78 +153,81 @@ private[kafka] class Log(val dir: File, * This method will generally be responsible for assigning offsets to the messages, * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid. * - * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set, - * or (-1,-1) if the message set is empty + * @param messages The message set to append + * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given + * + * @throws KafkaStorageException If the append fails due to an I/O error. + * + * @return Information about the appended messages including the first and last offset */ - def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = { - val messageSetInfo = analyzeAndValidateMessageSet(messages) + def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = { + val appendInfo = analyzeAndValidateMessageSet(messages) // if we have any valid messages, append them to the log - if(messageSetInfo.count == 0) { - (-1L, -1L) - } else { - BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageSetInfo.count) - BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageSetInfo.count) + if(appendInfo.count == 0) + return appendInfo + + // trim any invalid bytes or partial messages before appending it to the on-disk log + var validMessages = trimInvalidBytes(messages) - // trim any invalid bytes or partial messages before appending it to the on-disk log - var validMessages = trimInvalidBytes(messages) - - try { - // they are valid, insert them in the log - val offsets = lock synchronized { - // maybe roll the log if this segment is full - val segment = maybeRoll(segments.view.last) + try { + // they are valid, insert them in the log + lock synchronized { + // maybe roll the log if this segment is full + val segment = maybeRoll() - // assign offsets to the messageset - val offsets = - if(assignOffsets) { - val firstOffset = nextOffset.get - validMessages = validMessages.assignOffsets(nextOffset, messageSetInfo.codec) - val lastOffset = nextOffset.get - 1 - (firstOffset, lastOffset) - } else { - if(!messageSetInfo.offsetsMonotonic) - throw new IllegalArgumentException("Out of order offsets found in " + messages) - nextOffset.set(messageSetInfo.lastOffset + 1) - (messageSetInfo.firstOffset, messageSetInfo.lastOffset) - } - - // now append to the log - trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s" - .format(this.name, offsets._1, nextOffset.get(), validMessages)) - segment.append(offsets._1, validMessages) - - // return the offset at which the messages were appended - offsets + if(assignOffsets) { + // assign offsets to the messageset + appendInfo.firstOffset = nextOffset.get + validMessages = validMessages.assignOffsets(nextOffset, appendInfo.codec) + appendInfo.lastOffset = nextOffset.get - 1 + } else { + // we are taking the offsets we are given + if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get) + throw new IllegalArgumentException("Out of order offsets found in " + messages) + nextOffset.set(appendInfo.lastOffset + 1) } - + + // now append to the log + trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset)) + segment.append(appendInfo.firstOffset, validMessages) + // maybe flush the log and index - maybeFlush(messageSetInfo.count) + maybeFlush(appendInfo.count) - // return the first and last offset - offsets - } catch { - case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) + appendInfo } + } catch { + case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) } } - /* struct to hold various quantities we compute about each message set before appending to the log */ - case class MessageSetAppendInfo(firstOffset: Long, lastOffset: Long, codec: CompressionCodec, count: Int, offsetsMonotonic: Boolean) + /** Struct to hold various quantities we compute about each message set before appending to the log + * @param firstOffset The first offset in the message set + * @param lastOffset The last offset in the message set + * @param codec The codec used in the message set + * @param count The number of messages + * @param offsetsMonotonic Are the offsets in this message set monotonically increasing + */ + case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, count: Int, offsetsMonotonic: Boolean) /** * Validate the following: - * 1. each message is not too large - * 2. each message matches its CRC + *
    + *
  1. each message is not too large + *
  2. each message matches its CRC + *
* * Also compute the following quantities: - * 1. First offset in the message set - * 2. Last offset in the message set - * 3. Number of messages - * 4. Whether the offsets are monotonically increasing - * 5. Whether any compression codec is used (if many are used, then the last one is given) + *
    + *
  1. First offset in the message set + *
  2. Last offset in the message set + *
  3. Number of messages + *
  4. Whether the offsets are monotonically increasing + *
  5. Whether any compression codec is used (if many are used, then the last one is given) + *
*/ - private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): MessageSetAppendInfo = { + private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = { var messageCount = 0 var firstOffset, lastOffset = -1L var codec: CompressionCodec = NoCompressionCodec @@ -332,11 +254,13 @@ private[kafka] class Log(val dir: File, if(messageCodec != NoCompressionCodec) codec = messageCodec } - MessageSetAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic) + LogAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic) } /** * Trim any invalid bytes from the end of this message set (if there are any) + * @param messages The message set to trim + * @return A trimmed message set. This may be the same as what was passed in or it may not. */ private def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = { val messageSetValidBytes = messages.validBytes @@ -353,118 +277,131 @@ private[kafka] class Log(val dir: File, } /** - * Read a message set from the log. - * startOffset - The logical offset to begin reading at - * maxLength - The maximum number of bytes to read - * maxOffset - The maximum logical offset to include in the resulting message set + * Read messages from the log + * @param startOffset The offset to begin reading at + * @param maxLength The maximum number of bytes to read + * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set). + * + * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment. + * @return The messages read */ def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = { trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size)) - val view = segments.view - + // check if the offset is valid and in range - val first = view.head.start val next = nextOffset.get if(startOffset == next) return MessageSet.Empty - else if(startOffset > next || startOffset < first) - throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, first, next)) - // Do the read on the segment with a base offset less than the target offset - // TODO: to handle sparse offsets, we need to skip to the next segment if this read doesn't find anything - Log.findRange(view, startOffset, view.length) match { - case None => throw new OffsetOutOfRangeException("Offset is earlier than the earliest offset") - case Some(segment) => segment.read(startOffset, maxLength, maxOffset) + var entry = segments.floorEntry(startOffset) + + // attempt to read beyond the log end offset is an error + if(startOffset > next || entry == null) + throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next)) + + // do the read on the segment with a base offset less than the target offset + // but if that segment doesn't contain any messages with an offset greater than that + // continue to read from successive segments until we get some messages or we reach the end of the log + while(entry != null) { + val messages = entry.getValue.read(startOffset, maxOffset, maxLength) + if(messages == null) + entry = segments.higherEntry(entry.getKey) + else + return messages } + + // okay we are beyond the end of the last segment but less than the log end offset + MessageSet.Empty } /** - * Delete any log segments matching the given predicate function + * Delete any log segments matching the given predicate function, + * starting with the oldest segment and moving forward until a segment doesn't match. + * @param predicate A function that takes in a single log segment and returns true iff it is deletable + * @return The number of segments deleted */ - def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = { - lock synchronized { - val view = segments.view - val deletable = view.takeWhile(predicate) - for(seg <- deletable) - seg.deleted = true - var numToDelete = deletable.size - // if we are deleting everything, create a new empty segment - if(numToDelete == view.size) { - if (view(numToDelete - 1).size > 0) + def deleteOldSegments(predicate: LogSegment => Boolean): Int = { + // find any segments that match the user-supplied predicate UNLESS it is the final segment + // and it is empty (since we would just end up re-creating it + val lastSegment = activeSegment + var deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0)) + val numToDelete = deletable.size + if(numToDelete > 0) { + lock synchronized { + // we must always have at least one segment, so if we are going to delete all the segments, create a new one first + if(segments.size == numToDelete) roll() - else { - // If the last segment to be deleted is empty and we roll the log, the new segment will have the same - // file name. So simply reuse the last segment and reset the modified time. - view(numToDelete - 1).messageSet.file.setLastModified(time.milliseconds) - numToDelete -=1 - } + // remove the segments for lookups + deletable.foreach(d => segments.remove(d.baseOffset)) } - segments.trunc(numToDelete) + // do not lock around actual file deletion, it isn't O(1) on many filesystems + deletable.foreach(_.delete()) } + numToDelete } /** - * Get the size of the log in bytes + * The size of the log in bytes */ - def size: Long = segments.view.foldLeft(0L)(_ + _.size) + def size: Long = logSegments.map(_.size).sum /** - * Get the offset of the next message that will be appended + * The offset of the next message that will be appended to the log */ def logEndOffset: Long = nextOffset.get /** - * Roll the log over if necessary + * Roll the log over to a new empty log segment if necessary + * @return The currently active segment after (perhaps) rolling to a new segment */ - private def maybeRoll(segment: LogSegment): LogSegment = { - if ((segment.messageSet.sizeInBytes > maxLogFileSize) || - ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) || + private def maybeRoll(): LogSegment = { + val segment = activeSegment + if ((segment.size > maxSegmentSize) || + (segment.size > 0 && time.milliseconds - segment.created > rollIntervalMs) || segment.index.isFull) roll() else segment } - + /** - * Create a new segment and make it active, and return it + * Roll the log over to a new active segment starting with the current logEndOffset. + * This will trim the index to the exact size of the number of entries it currently contains. + * @return The newly rolled segment */ def roll(): LogSegment = { lock synchronized { - flush() - rollToOffset(logEndOffset) - } - } + // flush the log to ensure that only the active segment needs to be recovered + if(!segments.isEmpty()) + flush() - /** - * Roll the log over to the given new offset value - */ - private def rollToOffset(newOffset: Long): LogSegment = { - val logFile = logFilename(dir, newOffset) - val indexFile = indexFilename(dir, newOffset) - for(file <- List(logFile, indexFile); if file.exists) { - warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") - file.delete() + val newOffset = logEndOffset + val logFile = logFilename(dir, newOffset) + val indexFile = indexFilename(dir, newOffset) + for(file <- List(logFile, indexFile); if file.exists) { + warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") + file.delete() + } + + debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName) + segments.lastEntry() match { + case null => + case entry => entry.getValue.index.trimToValidSize() + } + val segment = new LogSegment(dir, + startOffset = newOffset, + indexIntervalBytes = indexIntervalBytes, + maxIndexSize = maxIndexSize) + val prev = segments.put(segment.baseOffset, segment) + if(prev != null) + throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset)) + segment } - debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName) - segments.view.lastOption match { - case Some(segment) => segment.index.trimToValidSize() - case None => - } - - val segmentsView = segments.view - if(segmentsView.size > 0 && segmentsView.last.start == newOffset) - throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset)) - - val segment = new LogSegment(dir, - startOffset = newOffset, - indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize) - segments.append(segment) - segment } /** * Flush the log if necessary + * @param numberOfMessages The number of messages that are being appended */ private def maybeFlush(numberOfMessages : Int) { if(unflushed.addAndGet(numberOfMessages) >= flushInterval) @@ -472,144 +409,128 @@ private[kafka] class Log(val dir: File, } /** - * Flush this log file to the physical disk + * Flush this log file and assoicated index to the physical disk */ def flush() : Unit = { if (unflushed.get == 0) return + debug("Flushing log '" + name + "' last flushed: " + lastFlushTime + " current time: " + + time.milliseconds + " unflushed = " + unflushed.get) lock synchronized { - debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " + - time.milliseconds) - segments.view.last.flush() + activeSegment.flush() unflushed.set(0) lastflushedTime.set(time.milliseconds) } } - def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = { - val segsArray = segments.view - var offsetTimeArray: Array[(Long, Long)] = null - if(segsArray.last.size > 0) - offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1) - else - offsetTimeArray = new Array[(Long, Long)](segsArray.length) - - for(i <- 0 until segsArray.length) - offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified) - if(segsArray.last.size > 0) - offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds) - - var startIndex = -1 - timestamp match { - case OffsetRequest.LatestTime => - startIndex = offsetTimeArray.length - 1 - case OffsetRequest.EarliestTime => - startIndex = 0 - case _ => - var isFound = false - debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) - startIndex = offsetTimeArray.length - 1 - while (startIndex >= 0 && !isFound) { - if (offsetTimeArray(startIndex)._2 <= timestamp) - isFound = true - else - startIndex -=1 - } - } - - val retSize = maxNumOffsets.min(startIndex + 1) - val ret = new Array[Long](retSize) - for(j <- 0 until retSize) { - ret(j) = offsetTimeArray(startIndex)._1 - startIndex -= 1 - } - // ensure that the returned seq is in descending order of offsets - ret.toSeq.sortBy(- _) - } - + /** + * Delete this log from the filesystem entirely + */ def delete(): Unit = { - deleteSegments(segments.contents.get()) + logSegments.foreach(_.delete()) Utils.rm(dir) } - - - /* Attempts to delete all provided segments from a log and returns how many it was able to */ - def deleteSegments(segments: Seq[LogSegment]): Int = { - var total = 0 - for(segment <- segments) { - info("Deleting log segment " + segment.start + " from " + name) - val deletedLog = segment.messageSet.delete() - val deletedIndex = segment.index.delete() - if(!deletedIndex || !deletedLog) { - throw new KafkaStorageException("Deleting log segment " + segment.start + " failed.") - } else { - total += 1 - } - } - total - } + /** + * Truncate this log so that it ends with the greatest offset < targetOffset. + * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete. + */ def truncateTo(targetOffset: Long) { + info("Truncating log %s to offset %d.".format(name, targetOffset)) if(targetOffset < 0) throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset)) + if(targetOffset > logEndOffset) { + info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1)) + return + } lock synchronized { - val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset) - val viewSize = segments.view.size - val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted) - /* We should not hit this error because segments.view is locked in markedDeletedWhile() */ - if(numSegmentsDeleted != segmentsToBeDeleted.size) - error("Failed to delete some segments when attempting to truncate to offset " + targetOffset +")") - if (numSegmentsDeleted == viewSize) { - segments.trunc(segments.view.size) - rollToOffset(targetOffset) - this.nextOffset.set(targetOffset) + if(segments.firstEntry.getValue.baseOffset > targetOffset) { + truncateFullyAndStartAt(targetOffset) } else { - if(targetOffset > logEndOffset) { - error("Target offset %d cannot be greater than the last message offset %d in the log %s". - format(targetOffset, logEndOffset, segments.view.last.messageSet.file.getAbsolutePath)) - } else { - // find the log segment that has this hw - val segmentToBeTruncated = findRange(segments.view, targetOffset) - segmentToBeTruncated match { - case Some(segment) => - val truncatedSegmentIndex = segments.view.indexOf(segment) - segments.truncLast(truncatedSegmentIndex) - segment.truncateTo(targetOffset) - this.nextOffset.set(targetOffset) - info("Truncated log segment %s to target offset %d".format(segments.view.last.messageSet.file.getAbsolutePath, targetOffset)) - case None => // nothing to do - } - } + val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) + deletable.foreach(s => segments.remove(s.baseOffset)) + deletable.foreach(_.delete()) + activeSegment.truncateTo(targetOffset) + this.nextOffset.set(targetOffset) } } } - /** - * Truncate all segments in the log and start a new segment on a new offset + /** + * Delete all data in the log and start at the new offset + * @param newOffset The new offset to start the log with */ - def truncateAndStartWithNewOffset(newOffset: Long) { + def truncateFullyAndStartAt(newOffset: Long) { + debug("Truncate and start log '" + name + "' to " + newOffset) lock synchronized { - val deletedSegments = segments.trunc(segments.view.size) - debug("Truncate and start log '" + name + "' to " + newOffset) - segments.append(new LogSegment(dir, - newOffset, - indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize)) - deleteSegments(deletedSegments) + val segmentsToDelete = logSegments.toList + segments.clear() + segmentsToDelete.foreach(_.delete()) + segments.put(newOffset, + new LogSegment(dir, + newOffset, + indexIntervalBytes = indexIntervalBytes, + maxIndexSize = maxIndexSize)) this.nextOffset.set(newOffset) } } - def topicName():String = { - name.substring(0, name.lastIndexOf("-")) - } - - def getLastFlushedTime():Long = { - return lastflushedTime.get - } + /** + * The time this log is last known to have been fully flushed to disk + */ + def lastFlushTime(): Long = lastflushedTime.get + + /** + * The active segment that is currently taking appends + */ + def activeSegment = segments.lastEntry.getValue + + /** + * All the log segments in this log ordered from oldest to newest + */ + def logSegments: Iterable[LogSegment] = asIterable(segments.values) override def toString() = "Log(" + this.dir + ")" } + +/** + * Helper functions for logs + */ +object Log { + val LogFileSuffix = ".log" + val IndexFileSuffix = ".index" + + /** + * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros + * so that ls sorts the files numerically. + * @param offset The offset to use in the file name + * @return The filename + */ + def filenamePrefixFromOffset(offset: Long): String = { + val nf = NumberFormat.getInstance() + nf.setMinimumIntegerDigits(20) + nf.setMaximumFractionDigits(0) + nf.setGroupingUsed(false) + nf.format(offset) + } + + /** + * Construct a log file name in the given dir with the given base offset + * @param dir The directory in which the log will reside + * @param offset The base offset of the log file + */ + def logFilename(dir: File, offset: Long) = + new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix) + + /** + * Construct an index file name in the given dir using the given base offset + * @param dir The directory in which the log will reside + * @param offset The base offset of the log file + */ + def indexFilename(dir: File, offset: Long) = + new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix) + +} diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 1d4f885a096..69ef5ea0a7c 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -20,7 +20,6 @@ package kafka.log import java.io._ import kafka.utils._ import scala.collection._ -import kafka.log.Log._ import kafka.common.{TopicAndPartition, KafkaException} import kafka.server.KafkaConfig @@ -36,9 +35,9 @@ import kafka.server.KafkaConfig * A background thread handles log retention by periodically truncating excess log segments. */ @threadsafe -private[kafka] class LogManager(val config: KafkaConfig, - scheduler: KafkaScheduler, - private val time: Time) extends Logging { +class LogManager(val config: KafkaConfig, + scheduler: KafkaScheduler, + private val time: Time) extends Logging { val CleanShutdownFile = ".kafka_cleanshutdown" val LockFile = ".lock" @@ -62,9 +61,12 @@ private[kafka] class LogManager(val config: KafkaConfig, loadLogs(logDirs) /** - * 1. Ensure that there are no duplicates in the directory list - * 2. Create each directory if it doesn't exist - * 3. Check that each path is a readable directory + * Create and check validity of the given directories, specifically: + *
    + *
  1. Ensure that there are no duplicates in the directory list + *
  2. Create each directory if it doesn't exist + *
  3. Check that each path is a readable directory + *
*/ private def createAndValidateLogDirs(dirs: Seq[File]) { if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size) @@ -95,7 +97,7 @@ private[kafka] class LogManager(val config: KafkaConfig, } /** - * Recovery and load all logs in the given data directories + * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { for(dir <- dirs) { @@ -120,8 +122,7 @@ private[kafka] class LogManager(val config: KafkaConfig, needsRecovery, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, - time, - config.brokerId) + time) val previous = this.logs.put(topicPartition, log) if(previous != null) throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) @@ -132,7 +133,7 @@ private[kafka] class LogManager(val config: KafkaConfig, } /** - * Start the log flush thread + * Start the background threads to flush logs and do log cleanup */ def startup() { /* Schedule the cleanup task to delete old logs */ @@ -140,14 +141,17 @@ private[kafka] class LogManager(val config: KafkaConfig, info("Starting log cleaner every " + logCleanupIntervalMs + " ms") scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false) info("Starting log flusher every " + config.flushSchedulerThreadRate + - " ms with the following overrides " + logFlushIntervals) - scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-", - config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false) + " ms with the following overrides " + logFlushIntervals) + scheduler.scheduleWithRate(flushDirtyLogs, + "kafka-logflusher-", + config.flushSchedulerThreadRate, + config.flushSchedulerThreadRate, + isDaemon = false) } } /** - * Get the log if it exists + * Get the log if it exists, otherwise return None */ def getLog(topic: String, partition: Int): Option[Log] = { val topicAndPartiton = TopicAndPartition(topic, partition) @@ -159,7 +163,7 @@ private[kafka] class LogManager(val config: KafkaConfig, } /** - * Create the log if it does not exist, if it exists just return it + * Create the log if it does not exist, otherwise just return it */ def getOrCreateLog(topic: String, partition: Int): Log = { val topicAndPartition = TopicAndPartition(topic, partition) @@ -195,8 +199,7 @@ private[kafka] class LogManager(val config: KafkaConfig, needsRecovery = false, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, - time, - config.brokerId) + time) info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath)) logs.put(topicAndPartition, log) log @@ -223,14 +226,6 @@ private[kafka] class LogManager(val config: KafkaConfig, } } - def getOffsets(topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { - val log = getLog(topicAndPartition.topic, topicAndPartition.partition) - log match { - case Some(l) => l.getOffsetsBefore(timestamp, maxNumOffsets) - case None => getEmptyOffsets(timestamp) - } - } - /** * Runs through the log removing segments older than a certain age */ @@ -238,9 +233,7 @@ private[kafka] class LogManager(val config: KafkaConfig, val startMs = time.milliseconds val topic = parseTopicPartitionName(log.name).topic val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs) - val toBeDeleted = log.markDeletedWhile(startMs - _.messageSet.file.lastModified > logCleanupThresholdMs) - val total = log.deleteSegments(toBeDeleted) - total + log.deleteOldSegments(startMs - _.lastModified > logCleanupThresholdMs) } /** @@ -250,7 +243,8 @@ private[kafka] class LogManager(val config: KafkaConfig, private def cleanupSegmentsToMaintainSize(log: Log): Int = { val topic = parseTopicPartitionName(log.dir.getName).topic val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize) - if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0 + if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) + return 0 var diff = log.size - maxLogRetentionSize def shouldDelete(segment: LogSegment) = { if(diff - segment.size >= 0) { @@ -260,9 +254,7 @@ private[kafka] class LogManager(val config: KafkaConfig, false } } - val toBeDeleted = log.markDeletedWhile( shouldDelete ) - val total = log.deleteSegments(toBeDeleted) - total + log.deleteOldSegments(shouldDelete) } /** @@ -307,19 +299,20 @@ private[kafka] class LogManager(val config: KafkaConfig, */ private def flushDirtyLogs() = { debug("Checking for dirty logs to flush...") - for (log <- allLogs) { + for ((topicAndPartition, log) <- logs) { try { - val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime + val timeSinceLastFlush = time.milliseconds - log.lastFlushTime + var logFlushInterval = config.defaultFlushIntervalMs - if(logFlushIntervals.contains(log.topicName)) - logFlushInterval = logFlushIntervals(log.topicName) - debug(log.topicName + " flush interval " + logFlushInterval + - " last flushed " + log.getLastFlushedTime + " time since last flush: " + timeSinceLastFlush) + if(logFlushIntervals.contains(topicAndPartition.topic)) + logFlushInterval = logFlushIntervals(topicAndPartition.topic) + debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + logFlushInterval + + " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush) if(timeSinceLastFlush >= logFlushInterval) log.flush } catch { case e => - error("Error flushing topic " + log.topicName, e) + error("Error flushing topic " + topicAndPartition.topic, e) e match { case _: IOException => fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e) @@ -330,11 +323,12 @@ private[kafka] class LogManager(val config: KafkaConfig, } } + /** + * Parse the topic and partition out of the directory name of a log + */ private def parseTopicPartitionName(name: String): TopicAndPartition = { val index = name.lastIndexOf('-') TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) } - def topics(): Iterable[String] = logs.keys.map(_.topic) - } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 4417cffd05d..237cfc41d95 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -3,6 +3,7 @@ package kafka.log import scala.math._ import java.io.File import kafka.message._ +import kafka.common._ import kafka.utils._ /** @@ -12,24 +13,24 @@ import kafka.utils._ * any previous segment. * * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. + * + * @param log The message set containing log entries + * @param index The offset index + * @param baseOffset A lower bound on the offsets in this segment + * @param indexIntervalBytes The approximate number of bytes between entries in the index + * @param time The time instance */ @nonthreadsafe -class LogSegment(val messageSet: FileMessageSet, +class LogSegment(val log: FileMessageSet, val index: OffsetIndex, - val start: Long, + val baseOffset: Long, val indexIntervalBytes: Int, - time: Time) extends Range with Logging { + time: Time) extends Logging { - var firstAppendTime: Option[Long] = - if (messageSet.sizeInBytes > 0) - Some(time.milliseconds) - else - None + var created = time.milliseconds /* the number of bytes since we last added an entry in the offset index */ - var bytesSinceLastIndexEntry = 0 - - @volatile var deleted = false + private var bytesSinceLastIndexEntry = 0 def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) = this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), @@ -39,49 +40,62 @@ class LogSegment(val messageSet: FileMessageSet, SystemTime) /* Return the size in bytes of this log segment */ - def size: Long = messageSet.sizeInBytes() - - def updateFirstAppendTime() { - if (firstAppendTime.isEmpty) - firstAppendTime = Some(time.milliseconds) - } - + def size: Long = log.sizeInBytes() + /** * Append the given messages starting with the given offset. Add * an entry to the index if needed. * - * It is assumed this method is being called from within a lock + * It is assumed this method is being called from within a lock. + * + * @param offset The first offset in the message set. + * @param messages The messages to append. */ + @nonthreadsafe def append(offset: Long, messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { - trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, messageSet.sizeInBytes())) + trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes())) // append an entry to the index (if needed) if(bytesSinceLastIndexEntry > indexIntervalBytes) { - index.append(offset, messageSet.sizeInBytes()) + index.append(offset, log.sizeInBytes()) this.bytesSinceLastIndexEntry = 0 } // append the messages - messageSet.append(messages) - updateFirstAppendTime() + log.append(messages) this.bytesSinceLastIndexEntry += messages.sizeInBytes } } /** - * Find the physical file position for the least offset >= the given offset. If no offset is found - * that meets this criteria before the end of the log, return null. + * Find the physical file position for the first message with offset >= the requested offset. + * + * The lowerBound argument is an optimization that can be used if we already know a valid starting position + * in the file higher than the greast-lower-bound from the index. + * + * @param offset The offset we want to translate + * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and + * when omitted, the search will begin at the position in the offset index. + * + * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria. */ - private def translateOffset(offset: Long): OffsetPosition = { + @threadsafe + private def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = { val mapping = index.lookup(offset) - messageSet.searchFor(offset, mapping.position) + log.searchFor(offset, max(mapping.position, startingFilePosition)) } /** - * Read a message set from this segment beginning with the first offset - * greater than or equal to the startOffset. The message set will include + * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. + * + * @param startOffset A lower bound on the first offset to include in the message set we read + * @param maxSize The maximum number of bytes to include in the message set we read + * @param maxOffset An optional maximum offset for the message set we read + * + * @return The message set read or null if the startOffset is larger than the largest offset in this log. */ - def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = { + @threadsafe + def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = { if(maxSize < 0) throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) if(maxSize == 0) @@ -89,9 +103,9 @@ class LogSegment(val messageSet: FileMessageSet, val startPosition = translateOffset(startOffset) - // if the start position is already off the end of the log, return MessageSet.Empty + // if the start position is already off the end of the log, return null if(startPosition == null) - return MessageSet.Empty + return null // calculate the length of the message set to read based on whether or not they gave us a maxOffset val length = @@ -103,23 +117,58 @@ class LogSegment(val messageSet: FileMessageSet, // there is a max offset, translate it to a file position and use that to calculate the max read size if(offset < startOffset) throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) - val mapping = translateOffset(offset) + val mapping = translateOffset(offset, startPosition.position) val endPosition = if(mapping == null) - messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file + log.sizeInBytes() // the max offset is off the end of the log, use the end of the file else mapping.position min(endPosition - startPosition.position, maxSize) } } - messageSet.read(startPosition.position, length) + log.read(startPosition.position, length) + } + + /** + * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log. + * + * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this + * is corrupt. + */ + @nonthreadsafe + def recover(maxMessageSize: Int) { + index.truncate() + var validBytes = 0 + var lastIndexEntry = 0 + val iter = log.iterator(maxMessageSize) + try { + while(iter.hasNext) { + val entry = iter.next + entry.message.ensureValid() + if(validBytes - lastIndexEntry > indexIntervalBytes) { + index.append(entry.offset, validBytes) + lastIndexEntry = validBytes + } + validBytes += MessageSet.entrySize(entry.message) + } + } catch { + case e: InvalidMessageException => + logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage)) + } + val truncated = log.sizeInBytes - validBytes + if(truncated > 0) + warn("Truncated " + truncated + " invalid bytes from the log segment %s.".format(log.file.getAbsolutePath)) + log.truncateTo(validBytes) } - override def toString() = "LogSegment(start=" + start + ", size=" + size + ")" + override def toString() = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")" /** - * Truncate off all index and log entries with offsets greater than or equal to the current offset. + * Truncate off all index and log entries with offsets >= the given offset. + * If the given offset is larger than the largest message in this segment, do nothing. + * @param offset The offset to truncate to */ + @nonthreadsafe def truncateTo(offset: Long) { val mapping = translateOffset(offset) if(mapping == null) @@ -127,29 +176,37 @@ class LogSegment(val messageSet: FileMessageSet, index.truncateTo(offset) // after truncation, reset and allocate more space for the (new currently active) index index.resize(index.maxIndexSize) - messageSet.truncateTo(mapping.position) - if (messageSet.sizeInBytes == 0) - firstAppendTime = None + log.truncateTo(mapping.position) + if (log.sizeInBytes == 0) + created = time.milliseconds } /** * Calculate the offset that would be used for the next message to be append to this segment. - * Not that this is expensive. + * Note that this is expensive. */ + @threadsafe def nextOffset(): Long = { - val ms = read(index.lastOffset, messageSet.sizeInBytes, None) - ms.lastOption match { - case None => start - case Some(last) => last.nextOffset + val ms = read(index.lastOffset, None, log.sizeInBytes) + if(ms == null) { + baseOffset + } else { + ms.lastOption match { + case None => baseOffset + case Some(last) => last.nextOffset + } } } /** * Flush this log segment to disk */ + @threadsafe def flush() { - messageSet.flush() - index.flush() + LogFlushStats.logFlushTimer.time { + log.flush() + index.flush() + } } /** @@ -157,7 +214,25 @@ class LogSegment(val messageSet: FileMessageSet, */ def close() { Utils.swallow(index.close) - Utils.swallow(messageSet.close) + Utils.swallow(log.close) } + /** + * Delete this log segment from the filesystem. + * @throws KafkaStorageException if the delete fails. + */ + def delete() { + val deletedLog = log.delete() + val deletedIndex = index.delete() + if(!deletedLog && log.file.exists) + throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.") + if(!deletedIndex && index.file.exists) + throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") + } + + /** + * The last modified time of this log segment as a unix time stamp + */ + def lastModified = log.file.lastModified + } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 23adca13b3a..0e18f28d7fa 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -51,7 +51,7 @@ import kafka.utils._ */ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { - /* the memory mapping */ + /* initialize the memory mapping for this index */ private var mmap: MappedByteBuffer = { val newlyCreated = file.createNewFile() @@ -84,10 +84,12 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = } } - /* the maximum number of entries this index can hold */ + /** + * The maximum number of eight-byte entries this index can hold + */ def maxEntries = mmap.limit / 8 - /* the number of entries in the index */ + /* the number of eight-byte entries currently in the index */ private var size = new AtomicInteger(mmap.position / 8) /* the last offset in the index */ @@ -108,6 +110,10 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = /** * Find the largest offset less than or equal to the given targetOffset * and return a pair holding this offset and it's corresponding physical file position. + * + * @param targetOffset The offset to look up. + * + * @return The offset found and the corresponding file position for this offset. * If the target offset is smaller than the least entry in the index (or the index is empty), * the pair (baseOffset, 0) is returned. */ @@ -123,7 +129,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = /** * Find the slot in which the largest offset less than or equal to the given * target offset is stored. - * Return -1 if the least entry in the index is larger than the target offset or the index is empty + * + * @param idx The index buffer + * @param targetOffset The offset to look for + * + * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty */ private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = { // we only store the difference from the baseoffset so calculate that @@ -161,6 +171,8 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = /** * Get the nth offset mapping from the index + * @param n The entry number in the index + * @return The offset/position pair at that entry */ def entry(n: Int): OffsetPosition = { if(n >= entries) @@ -170,7 +182,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = } /** - * Append entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. + * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. */ def append(offset: Long, position: Int) { this synchronized { @@ -192,7 +204,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = def isFull: Boolean = entries >= this.maxEntries /** - * Truncate the entire index + * Truncate the entire index, deleting all entries */ def truncate() = truncateTo(this.baseOffset) diff --git a/core/src/main/scala/kafka/log/package.html b/core/src/main/scala/kafka/log/package.html index 0880be724de..d8521a1abbc 100644 --- a/core/src/main/scala/kafka/log/package.html +++ b/core/src/main/scala/kafka/log/package.html @@ -1 +1,6 @@ -The log management system for Kafka. \ No newline at end of file +The log management system for Kafka. + +The entry point for this system is LogManager. LogManager is responsible for holding all the logs, and handing them out by topic/partition. It also handles the enforcement of the +flush policy and retention policies. + +The Log itself is made up of log segments. A log is a FileMessageSet that contains the data and an OffsetIndex that supports reads by offset on the log. \ No newline at end of file diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index 7ef92ab3a18..a1b5c63b4d9 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -92,15 +92,23 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { } /** - * Print this message set's contents + * Print this message set's contents. If the message set has more than 100 messages, just + * print the first 100. */ override def toString: String = { val builder = new StringBuilder() builder.append(getClass.getSimpleName + "(") - for(message <- this) { + val iter = this.iterator + var i = 0 + while(iter.hasNext && i < 100) { + val message = iter.next builder.append(message) - builder.append(", ") + if(iter.hasNext) + builder.append(", ") + i += 1 } + if(iter.hasNext) + builder.append("...") builder.append(")") builder.toString } diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index 2c9f2d15746..72f68dc7983 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -75,14 +75,14 @@ class ProducerConfig private (val props: VerifiableProperties) val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null)) /** - * The producer using the zookeeper software load balancer maintains a ZK cache that gets - * updated by the zookeeper watcher listeners. During some events like a broker bounce, the - * producer ZK cache can get into an inconsistent state, for a small time period. In this time - * period, it could end up picking a broker partition that is unavailable. When this happens, the - * ZK cache needs to be updated. - * This parameter specifies the number of times the producer attempts to refresh this ZK cache. + * If a request fails it is possible to have the producer automatically retry. This is controlled by this setting. + * Note that not all errors mean that the message was lost--for example if the network connection is lost we will + * get a socket exception--in this case enabling retries can result in duplicate messages. */ val producerRetries = props.getInt("producer.num.retries", 3) + /** + * The amount of time to wait in between retries + */ val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a14e0a2f49b..eff627c066c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -21,6 +21,7 @@ import kafka.admin.{CreateTopicCommand, AdminUtils} import kafka.api._ import kafka.message._ import kafka.network._ +import kafka.log._ import kafka.utils.{Pool, SystemTime, Logging} import org.apache.log4j.Logger import scala.collection._ @@ -59,7 +60,7 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) - case requestId => throw new KafkaException("No mapping found for handler id " + requestId) + case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => @@ -243,12 +244,17 @@ class KafkaApis(val requestChannel: RequestChannel, try { val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) val log = localReplica.log.get - val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true) + val info = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true) + + // update stats + BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).messagesInRate.mark(info.count) + BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(info.count) + // we may need to increment high watermark since ISR could be down to 1 localReplica.partition.maybeIncrementLeaderHW(localReplica) trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" - .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end)) - ProduceResult(topicAndPartition, start, end) + .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) + ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset) } catch { case e: KafkaStorageException => fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) @@ -358,11 +364,11 @@ class KafkaApis(val requestChannel: RequestChannel, else replicaManager.getLeaderReplicaIfLocal(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val maxOffsetOpt = if (fromReplicaId == Request.OrdinaryConsumerId) { - Some(localReplica.highWatermark) - } else { - None - } + val maxOffsetOpt = + if (fromReplicaId == Request.OrdinaryConsumerId) + Some(localReplica.highWatermark) + else + None val messages = localReplica.log match { case Some(log) => log.read(offset, maxSize, maxOffsetOpt) @@ -391,15 +397,18 @@ class KafkaApis(val requestChannel: RequestChannel, else replicaManager.getReplicaOrException(topicAndPartition.topic, topicAndPartition.partition) val offsets = { - val allOffsets = replicaManager.logManager.getOffsets(topicAndPartition, - partitionOffsetRequestInfo.time, - partitionOffsetRequestInfo.maxNumOffsets) - if (!offsetRequest.isFromOrdinaryClient) allOffsets - else { + val allOffsets = fetchOffsets(replicaManager.logManager, + topicAndPartition, + partitionOffsetRequestInfo.time, + partitionOffsetRequestInfo.maxNumOffsets) + if (!offsetRequest.isFromOrdinaryClient) { + allOffsets + } else { val hw = localReplica.highWatermark if (allOffsets.exists(_ > hw)) hw +: allOffsets.dropWhile(_ > hw) - else allOffsets + else + allOffsets } } (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets)) @@ -412,6 +421,59 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } + + def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { + logManager.getLog(topicAndPartition.topic, topicAndPartition.partition) match { + case Some(log) => + fetchOffsetsBefore(log, timestamp, maxNumOffsets) + case None => + if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) + Seq(0L) + else + Nil + } + } + + def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { + val segsArray = log.logSegments.toArray + var offsetTimeArray: Array[(Long, Long)] = null + if(segsArray.last.size > 0) + offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1) + else + offsetTimeArray = new Array[(Long, Long)](segsArray.length) + + for(i <- 0 until segsArray.length) + offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified) + if(segsArray.last.size > 0) + offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds) + + var startIndex = -1 + timestamp match { + case OffsetRequest.LatestTime => + startIndex = offsetTimeArray.length - 1 + case OffsetRequest.EarliestTime => + startIndex = 0 + case _ => + var isFound = false + debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) + startIndex = offsetTimeArray.length - 1 + while (startIndex >= 0 && !isFound) { + if (offsetTimeArray(startIndex)._2 <= timestamp) + isFound = true + else + startIndex -=1 + } + } + + val retSize = maxNumOffsets.min(startIndex + 1) + val ret = new Array[Long](retSize) + for(j <- 0 until retSize) { + ret(j) = offsetTimeArray(startIndex)._1 + startIndex -= 1 + } + // ensure that the returned seq is in descending order of offsets + ret.toSeq.sortBy(- _) + } /** * Service the topic metadata request API diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index e0a86b966b9..27bd2884ef2 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -38,7 +38,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var logManager: LogManager = null var kafkaZookeeper: KafkaZooKeeper = null var replicaManager: ReplicaManager = null - private var apis: KafkaApis = null + var apis: KafkaApis = null var kafkaController: KafkaController = null val kafkaScheduler = new KafkaScheduler(4) var zkClient: ZkClient = null diff --git a/core/src/main/scala/kafka/server/LeaderElector.scala b/core/src/main/scala/kafka/server/LeaderElector.scala index a0b79e47c05..14b3fa4be8e 100644 --- a/core/src/main/scala/kafka/server/LeaderElector.scala +++ b/core/src/main/scala/kafka/server/LeaderElector.scala @@ -27,8 +27,6 @@ trait LeaderElector extends Logging { def amILeader : Boolean -// def electAndBecomeLeader: Unit -// def elect: Boolean def close diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 40afcabbe5e..8dc2b2abc87 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -66,7 +66,7 @@ class ReplicaFetcherThread(name:String, ) val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get - replica.log.get.truncateAndStartWithNewOffset(offset) + replica.log.get.truncateFullyAndStartAt(offset) offset } diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index 36a119bce7d..5a7a2df4b3a 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -54,7 +54,7 @@ import java.util.Properties; * The user need to provide the configuration file for 07 consumer and 08 producer. For 08 producer, * the "serializer.class" filed is set to "kafka.serializer.DefaultEncode" by the code. */ - +@SuppressWarnings({"unchecked", "rawtypes"}) public class KafkaMigrationTool { private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName()); diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala index 2bfa102a7d4..9e53b03c612 100644 --- a/core/src/main/scala/kafka/utils/Throttler.scala +++ b/core/src/main/scala/kafka/utils/Throttler.scala @@ -17,27 +17,24 @@ package kafka.utils; +import java.util.Random import scala.math._ -object Throttler extends Logging { - val DefaultCheckIntervalMs = 100L -} - /** * A class to measure and throttle the rate of some process. The throttler takes a desired rate-per-second * (the units of the process don't matter, it could be bytes or a count of some other thing), and will sleep for - * an appropraite amount of time when maybeThrottle() is called to attain the desired rate. + * an appropriate amount of time when maybeThrottle() is called to attain the desired rate. * * @param desiredRatePerSec: The rate we want to hit in units/sec * @param checkIntervalMs: The interval at which to check our rate * @param throttleDown: Does throttling increase or decrease our rate? * @param time: The time implementation to use */ -@nonthreadsafe +@threadsafe class Throttler(val desiredRatePerSec: Double, val checkIntervalMs: Long, val throttleDown: Boolean, - val time: Time) { + val time: Time) extends Logging { private val lock = new Object private var periodStartNs: Long = time.nanoseconds @@ -65,8 +62,7 @@ class Throttler(val desiredRatePerSec: Double, val elapsedMs = elapsedNs / Time.NsPerMs val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs) if(sleepTime > 0) { - Throttler.debug("Natural rate is " + rateInSecs + " per second but desired rate is " + desiredRatePerSec + - ", sleeping for " + sleepTime + " ms to compensate.") + println("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime)) time.sleep(sleepTime) } } @@ -77,3 +73,26 @@ class Throttler(val desiredRatePerSec: Double, } } + +object Throttler { + + val DefaultCheckIntervalMs = 100L + + def main(args: Array[String]) { + val rand = new Random() + val throttler = new Throttler(1000000, 100, true, SystemTime) + var start = System.currentTimeMillis + var total = 0 + while(true) { + val value = rand.nextInt(1000) + throttler.maybeThrottle(value) + total += value + val now = System.currentTimeMillis + if(now - start >= 1000) { + println(total) + start = now + total = 0 + } + } + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 17cabc7a3cf..7e17da4be6b 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -401,8 +401,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size) servers.foreach(_.shutdown()) - - } private def checkIfReassignPartitionPathExists(): Boolean = { diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala index d0044cf49c9..cec1caecc51 100644 --- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala @@ -35,13 +35,21 @@ class FileMessageSetTest extends BaseMessageSetTestCases { set } + /** + * Test that the cached size variable matches the actual file size as we append messages + */ @Test def testFileSize() { assertEquals(messageSet.channel.size, messageSet.sizeInBytes) - messageSet.append(singleMessageSet("abcd".getBytes())) - assertEquals(messageSet.channel.size, messageSet.sizeInBytes) + for(i <- 0 until 20) { + messageSet.append(singleMessageSet("abcd".getBytes)) + assertEquals(messageSet.channel.size, messageSet.sizeInBytes) + } } + /** + * Test that adding invalid bytes to the end of the log doesn't break iteration + */ @Test def testIterationOverPartialAndTruncation() { testPartialWrite(0, messageSet) @@ -62,6 +70,9 @@ class FileMessageSetTest extends BaseMessageSetTestCases { checkEquals(messages.iterator, messageSet.map(m => m.message).iterator) } + /** + * Iterating over the file does file reads but shouldn't change the position of the underlying FileChannel. + */ @Test def testIterationDoesntChangePosition() { val position = messageSet.channel.position @@ -69,39 +80,71 @@ class FileMessageSetTest extends BaseMessageSetTestCases { assertEquals(position, messageSet.channel.position) } + /** + * Test a simple append and read. + */ @Test def testRead() { - val read = messageSet.read(0, messageSet.sizeInBytes) + var read = messageSet.read(0, messageSet.sizeInBytes) checkEquals(messageSet.iterator, read.iterator) val items = read.iterator.toList val sec = items.tail.head - val read2 = messageSet.read(MessageSet.entrySize(sec.message), messageSet.sizeInBytes) - checkEquals(items.tail.iterator, read2.iterator) + read = messageSet.read(position = MessageSet.entrySize(sec.message), size = messageSet.sizeInBytes) + assertEquals("Try a read starting from the second message", items.tail, read.toList) + read = messageSet.read(MessageSet.entrySize(sec.message), MessageSet.entrySize(sec.message)) + assertEquals("Try a read of a single message starting from the second message", List(items.tail.head), read.toList) } + /** + * Test the MessageSet.searchFor API. + */ @Test def testSearch() { // append a new message with a high offset val lastMessage = new Message("test".getBytes) messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(50), lastMessage)) - var physicalOffset = 0 + var position = 0 assertEquals("Should be able to find the first message by its offset", - OffsetPosition(0L, physicalOffset), + OffsetPosition(0L, position), messageSet.searchFor(0, 0)) - physicalOffset += MessageSet.entrySize(messageSet.head.message) + position += MessageSet.entrySize(messageSet.head.message) assertEquals("Should be able to find second message when starting from 0", - OffsetPosition(1L, physicalOffset), + OffsetPosition(1L, position), messageSet.searchFor(1, 0)) assertEquals("Should be able to find second message starting from its offset", - OffsetPosition(1L, physicalOffset), - messageSet.searchFor(1, physicalOffset)) - physicalOffset += MessageSet.entrySize(messageSet.tail.head.message) - assertEquals("Should be able to find third message from a non-existant offset", - OffsetPosition(50L, physicalOffset), - messageSet.searchFor(3, physicalOffset)) - assertEquals("Should be able to find third message by correct offset", - OffsetPosition(50L, physicalOffset), - messageSet.searchFor(50, physicalOffset)) + OffsetPosition(1L, position), + messageSet.searchFor(1, position)) + position += MessageSet.entrySize(messageSet.tail.head.message) + MessageSet.entrySize(messageSet.tail.tail.head.message) + assertEquals("Should be able to find fourth message from a non-existant offset", + OffsetPosition(50L, position), + messageSet.searchFor(3, position)) + assertEquals("Should be able to find fourth message by correct offset", + OffsetPosition(50L, position), + messageSet.searchFor(50, position)) + } + + /** + * Test that the message set iterator obeys start and end slicing + */ + @Test + def testIteratorWithLimits() { + val message = messageSet.toList(1) + val start = messageSet.searchFor(1, 0).position + val size = message.message.size + val slice = messageSet.read(start, size) + assertEquals(List(message), slice.toList) + } + + /** + * Test the truncateTo method lops off messages and appropriately updates the size + */ + @Test + def testTruncate() { + val message = messageSet.toList(0) + val end = messageSet.searchFor(1, 0).position + messageSet.truncateTo(end) + assertEquals(List(message), messageSet.toList) + assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes) } } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index b06d8128805..d9f189e63e2 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -59,6 +59,9 @@ class LogManagerTest extends JUnit3Suite { super.tearDown() } + /** + * Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log. + */ @Test def testCreateLog() { val log = logManager.getOrCreateLog(name, 0) @@ -67,29 +70,34 @@ class LogManagerTest extends JUnit3Suite { log.append(TestUtils.singleMessageSet("test".getBytes())) } + /** + * Test that get on a non-existent returns None and no log is created. + */ @Test - def testGetLog() { + def testGetNonExistentLog() { val log = logManager.getLog(name, 0) + assertEquals("No log should be found.", None, log) val logFile = new File(config.logDirs(0), name + "-0") assertTrue(!logFile.exists) } + /** + * Test time-based log cleanup. First append messages, then set the time into the future and run cleanup. + */ @Test def testCleanupExpiredSegments() { val log = logManager.getOrCreateLog(name, 0) var offset = 0L for(i <- 0 until 1000) { var set = TestUtils.singleMessageSet("test".getBytes()) - val (start, end) = log.append(set) - offset = end + val info = log.append(set) + offset = info.lastOffset } - log.flush assertTrue("There should be more than one segment now.", log.numberOfSegments > 1) // update the last modified time of all log segments - val logSegments = log.segments.view - logSegments.foreach(_.messageSet.file.setLastModified(time.currentMs)) + log.logSegments.foreach(_.log.file.setLastModified(time.currentMs)) time.currentMs += maxLogAgeHours*60*60*1000 + 1 logManager.cleanupLogs() @@ -106,6 +114,9 @@ class LogManagerTest extends JUnit3Suite { log.append(TestUtils.singleMessageSet("test".getBytes())) } + /** + * Test size-based cleanup. Append messages, then run cleanup and check that segments are deleted. + */ @Test def testCleanupSegmentsToMaintainSize() { val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes @@ -130,11 +141,9 @@ class LogManagerTest extends JUnit3Suite { // add a bunch of messages that should be larger than the retentionSize for(i <- 0 until 1000) { val set = TestUtils.singleMessageSet("test".getBytes()) - val (start, end) = log.append(set) - offset = start + val info = log.append(set) + offset = info.firstOffset } - // flush to make sure it's written to disk - log.flush // should be exactly 100 full segments + 1 new empty one assertEquals("There should be example 100 segments.", 100, log.numberOfSegments) @@ -153,29 +162,33 @@ class LogManagerTest extends JUnit3Suite { log.append(TestUtils.singleMessageSet("test".getBytes())) } + /** + * Test that flush is invoked by the background scheduler thread. + */ @Test def testTimeBasedFlush() { val props = TestUtils.createBrokerConfig(0, -1) logManager.shutdown() config = new KafkaConfig(props) { - override val logFileSize = 1024 *1024 *1024 - override val flushSchedulerThreadRate = 50 + override val flushSchedulerThreadRate = 5 + override val defaultFlushIntervalMs = 5 override val flushInterval = Int.MaxValue - override val logRollHours = maxRollInterval - override val flushIntervalMap = Map("timebasedflush" -> 100) } - logManager = new LogManager(config, scheduler, time) + logManager = new LogManager(config, scheduler, SystemTime) logManager.startup val log = logManager.getOrCreateLog(name, 0) + val lastFlush = log.lastFlushTime for(i <- 0 until 200) { var set = TestUtils.singleMessageSet("test".getBytes()) log.append(set) } - val ellapsed = System.currentTimeMillis - log.getLastFlushedTime - assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed), - ellapsed < 2*config.flushSchedulerThreadRate) + Thread.sleep(config.flushSchedulerThreadRate) + assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime) } + /** + * Test that new logs that are created are assigned to the least loaded log directory + */ @Test def testLeastLoadedAssignment() { // create a log manager with multiple data directories @@ -196,6 +209,9 @@ class LogManagerTest extends JUnit3Suite { } } + /** + * Test that it is not possible to open two log managers using the same data directory + */ def testTwoLogManagersUsingSameDirFails() { try { new LogManager(logManager.config, scheduler, time) diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 86a30f32b13..bffe4a451a0 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -2,6 +2,9 @@ package kafka.log import junit.framework.Assert._ import java.util.concurrent.atomic._ +import java.io.File +import java.io.RandomAccessFile +import java.util.Random import org.junit.{Test, After} import org.scalatest.junit.JUnit3Suite import kafka.utils.TestUtils @@ -13,6 +16,7 @@ class LogSegmentTest extends JUnit3Suite { val segments = mutable.ArrayBuffer[LogSegment]() + /* create a segment with the given base offset */ def createSegment(offset: Long): LogSegment = { val msFile = TestUtils.tempFile() val ms = new FileMessageSet(msFile) @@ -24,6 +28,7 @@ class LogSegmentTest extends JUnit3Suite { seg } + /* create a ByteBufferMessageSet for the given messages starting from the given offset */ def messages(offset: Long, messages: String*): ByteBufferMessageSet = { new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, offsetCounter = new AtomicLong(offset), @@ -34,17 +39,24 @@ class LogSegmentTest extends JUnit3Suite { def teardown() { for(seg <- segments) { seg.index.delete() - seg.messageSet.delete() + seg.log.delete() } } + /** + * A read on an empty log segment should return null + */ @Test def testReadOnEmptySegment() { val seg = createSegment(40) val read = seg.read(startOffset = 40, maxSize = 300, maxOffset = None) - assertEquals(0, read.size) + assertNull("Read beyond the last offset in the segment should be null", read) } + /** + * Reading from before the first offset in the segment should return messages + * beginning with the first message in the segment + */ @Test def testReadBeforeFirstOffset() { val seg = createSegment(40) @@ -54,24 +66,40 @@ class LogSegmentTest extends JUnit3Suite { assertEquals(ms.toList, read.toList) } + /** + * If we set the startOffset and maxOffset for the read to be the same value + * we should get only the first message in the log + */ @Test - def testReadSingleMessage() { - val seg = createSegment(40) - val ms = messages(50, "hello", "there") - seg.append(50, ms) - val read = seg.read(startOffset = 41, maxSize = 200, maxOffset = Some(50)) - assertEquals(new Message("hello".getBytes), read.head.message) + def testMaxOffset() { + val baseOffset = 50 + val seg = createSegment(baseOffset) + val ms = messages(baseOffset, "hello", "there", "beautiful") + seg.append(baseOffset, ms) + def validate(offset: Long) = + assertEquals(ms.filter(_.offset == offset).toList, + seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).toList) + validate(50) + validate(51) + validate(52) } + /** + * If we read from an offset beyond the last offset in the segment we should get null + */ @Test def testReadAfterLast() { val seg = createSegment(40) val ms = messages(50, "hello", "there") seg.append(50, ms) val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None) - assertEquals(0, read.size) + assertNull("Read beyond the last offset in the segment should give null", null) } + /** + * If we read from an offset which doesn't exist we should get a message set beginning + * with the least offset greater than the given startOffset. + */ @Test def testReadFromGap() { val seg = createSegment(40) @@ -83,6 +111,10 @@ class LogSegmentTest extends JUnit3Suite { assertEquals(ms2.toList, read.toList) } + /** + * In a loop append two messages then truncate off the second of those messages and check that we can read + * the first but not the second message. + */ @Test def testTruncate() { val seg = createSegment(40) @@ -93,26 +125,33 @@ class LogSegmentTest extends JUnit3Suite { val ms2 = messages(offset+1, "hello") seg.append(offset+1, ms2) // check that we can read back both messages - val read = seg.read(offset, 10000, None) + val read = seg.read(offset, None, 10000) assertEquals(List(ms1.head, ms2.head), read.toList) // now truncate off the last message seg.truncateTo(offset + 1) - val read2 = seg.read(offset, 10000, None) + val read2 = seg.read(offset, None, 10000) assertEquals(1, read2.size) assertEquals(ms1.head, read2.head) offset += 1 } } + /** + * Test truncating the whole segment, and check that we can reappend with the original offset. + */ @Test def testTruncateFull() { // test the case where we fully truncate the log val seg = createSegment(40) seg.append(40, messages(40, "hello", "there")) seg.truncateTo(0) + assertNull("Segment should be empty.", seg.read(0, None, 1024)) seg.append(40, messages(40, "hello", "there")) } + /** + * Test that offsets are assigned sequentially and that the nextOffset variable is incremented + */ @Test def testNextOffsetCalculation() { val seg = createSegment(40) @@ -121,4 +160,50 @@ class LogSegmentTest extends JUnit3Suite { assertEquals(53, seg.nextOffset()) } + /** + * Create a segment with some data and an index. Then corrupt the index, + * and recover the segment, the entries should all be readable. + */ + @Test + def testRecoveryFixesCorruptIndex() { + val seg = createSegment(0) + for(i <- 0 until 100) + seg.append(i, messages(i, i.toString)) + val indexFile = seg.index.file + writeNonsense(indexFile, 5, indexFile.length.toInt) + seg.recover(64*1024) + for(i <- 0 until 100) + assertEquals(i, seg.read(i, Some(i+1), 1024).head.offset) + } + + /** + * Randomly corrupt a log a number of times and attempt recovery. + */ + @Test + def testRecoveryWithCorruptMessage() { + val rand = new Random(1) + val messagesAppended = 20 + for(iteration <- 0 until 10) { + val seg = createSegment(0) + for(i <- 0 until messagesAppended) + seg.append(i, messages(i, i.toString)) + val offsetToBeginCorruption = rand.nextInt(messagesAppended) + // start corrupting somewhere in the middle of the chosen record all the way to the end + val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15) + writeNonsense(seg.log.file, position, seg.log.file.length.toInt - position) + seg.recover(64*1024) + assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList) + seg.delete() + } + } + + def writeNonsense(fileName: File, position: Long, size: Int) { + val file = new RandomAccessFile(fileName, "rw") + file.seek(position) + val rand = new Random + for(i <- 0 until size) + file.writeByte(rand.nextInt(255)) + file.close() + } + } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index afaa28474fe..ce44b0128dc 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -54,7 +54,10 @@ class LogTest extends JUnitSuite { } } - /** Test that the size and time based log segment rollout works. */ + /** + * Tests for time based log roll. This test appends messages then changes the time + * using the mock clock to force the log to roll and checks the number of segments. + */ @Test def testTimeBasedLogRoll() { val set = TestUtils.singleMessageSet("test".getBytes()) @@ -70,27 +73,26 @@ class LogTest extends JUnitSuite { assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) log.append(set) - assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) + assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments) - // segment expires in age - time.currentMs += rollMs + 1 - log.append(set) - assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments) + for(numSegments <- 2 until 4) { + time.currentMs += rollMs + 1 + log.append(set) + assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments) + } + val numSegments = log.numberOfSegments time.currentMs += rollMs + 1 - val blank = Array[Message]() - log.append(new ByteBufferMessageSet(new Message("blah".getBytes))) - assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments) - - time.currentMs += rollMs + 1 - // the last segment expired in age, but was blank. So new segment should not be generated log.append(new ByteBufferMessageSet()) - assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments) + assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments) } + /** + * Test that appending more than the maximum segment size rolls the log + */ @Test def testSizeBasedLogRoll() { - val set = TestUtils.singleMessageSet("test".getBytes()) + val set = TestUtils.singleMessageSet("test".getBytes) val setSize = set.sizeInBytes val msgPerSeg = 10 val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages @@ -106,28 +108,80 @@ class LogTest extends JUnitSuite { assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments) } + /** + * Test that we can open and append to an empty log + */ @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) - } - - @Test - def testAppendAndRead() { val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) - val message = new Message(Integer.toString(42).getBytes()) - for(i <- 0 until 10) - log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) - log.flush() - val messages = log.read(0, 1024) - var current = 0 - for(curr <- messages) { - assertEquals("Read message should equal written", message, curr.message) - current += 1 - } - assertEquals(10, current) + log.append(TestUtils.singleMessageSet("test".getBytes)) } + /** + * This test case appends a bunch of messages and checks that we can read them all back using sequential offsets. + */ + @Test + def testAppendAndReadWithSequentialOffsets() { + val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray + + for(i <- 0 until messages.length) + log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i))) + for(i <- 0 until messages.length) { + val read = log.read(i, 100, Some(i+1)).head + assertEquals("Offset read should match order appended.", i, read.offset) + assertEquals("Message should match appended.", messages(i), read.message) + } + assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size) + } + + /** + * This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message + * from any offset less than the logEndOffset including offsets not appended. + */ + @Test + def testAppendAndReadWithNonSequentialOffsets() { + val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray + val messages = messageIds.map(id => new Message(id.toString.getBytes)) + + // now test the case that we give the offsets and use non-sequential offsets + for(i <- 0 until messages.length) + log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false) + for(i <- 50 until messageIds.max) { + val idx = messageIds.indexWhere(_ >= i) + val read = log.read(i, 100, None).head + assertEquals("Offset read should match message id.", messageIds(idx), read.offset) + assertEquals("Message should match appended.", messages(idx), read.message) + } + } + + /** + * This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment. + * Specifically we create a log where the last message in the first segment has offset 0. If we + * then read offset 1, we should expect this read to come from the second segment, even though the + * first segment has the greatest lower bound on the offset. + */ + @Test + def testReadAtLogGap() { + val log = new Log(logDir, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + + // keep appending until we have two segments with only a single message in the second segment + while(log.numberOfSegments == 1) + log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) + + // now manually truncate off all but one message from the first segment to create a gap in the messages + log.logSegments.head.truncateTo(1) + + assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset) + } + + /** + * Test reading at the boundary of the log, specifically + * - reading from the logEndOffset should give an empty message set + * - reading beyond the log end offset should throw an OffsetOutOfRangeException + */ @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) @@ -147,14 +201,17 @@ class LogTest extends JUnitSuite { } } - /** Test that writing and reading beyond the log size boundary works */ + /** + * Test that covers reads and writes on a multisegment log. This test appends a bunch of messages + * and then reads them all back and checks that the message read and offset matches what was appended. + */ @Test def testLogRolls() { /* create a multipart log with 100 messages */ val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes)) - val offsets = messageSets.map(log.append(_)._1) + val offsets = messageSets.map(log.append(_).firstOffset) log.flush /* do successive reads to ensure all our messages are there */ @@ -169,7 +226,9 @@ class LogTest extends JUnitSuite { assertEquals("Should be no more messages", 0, lastRead.size) } - /** Test the case where we have compressed batches of messages */ + /** + * Test reads at offsets that fall within compressed message set boundaries. + */ @Test def testCompressedMessages() { /* this log should roll after every messageset */ @@ -187,66 +246,46 @@ class LogTest extends JUnitSuite { assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset) assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset) } - - @Test - def testFindSegment() { - assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45)) - assertEquals("Search in segment list just outside the range of the last segment should find last segment", - 9, Log.findRange(makeRanges(5, 9, 12), 12).get.start) - assertEquals("Search in segment list far outside the range of the last segment should find last segment", - 9, Log.findRange(makeRanges(5, 9, 12), 100).get.start) - assertEquals("Search in segment list far outside the range of the last segment should find last segment", - None, Log.findRange(makeRanges(5, 9, 12), -1)) - assertContains(makeRanges(5, 9, 12), 11) - assertContains(makeRanges(5), 4) - assertContains(makeRanges(5,8), 5) - assertContains(makeRanges(5,8), 6) - } + /** + * Test garbage collecting old segments + */ @Test - def testEdgeLogRollsStartingAtZero() { - // first test a log segment starting at 0 - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) - val curOffset = log.logEndOffset - assertEquals(curOffset, 0) + def testThatGarbageCollectingSegmentsDoesntChangeOffset() { + for(messagesToAppend <- List(0, 1, 25)) { + logDir.mkdirs() + // first test a log segment starting at 0 + val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + for(i <- 0 until messagesToAppend) + log.append(TestUtils.singleMessageSet(i.toString.getBytes)) + + var currOffset = log.logEndOffset + assertEquals(currOffset, messagesToAppend) - // time goes by; the log file is deleted - log.markDeletedWhile(_ => true) + // time goes by; the log file is deleted + log.deleteOldSegments(_ => true) - // we now have a new log; the starting offset of the new log should remain 0 - assertEquals(curOffset, log.logEndOffset) - log.delete() - } - - @Test - def testEdgeLogRollsStartingAtNonZero() { - // second test an empty log segment starting at non-zero - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) - val numMessages = 1 - for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(i.toString.getBytes)) - val curOffset = log.logEndOffset - - // time goes by; the log file is deleted - log.markDeletedWhile(_ => true) - - // we now have a new log - assertEquals(curOffset, log.logEndOffset) - - // time goes by; the log file (which is empty) is deleted again - val deletedSegments = log.markDeletedWhile(_ => true) - - // we shouldn't delete the last empty log segment. - assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0) - - // we now have a new log - assertEquals(curOffset, log.logEndOffset) + assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset) + assertEquals("We should still have one segment left", 1, log.numberOfSegments) + assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true)) + assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) + assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", + currOffset, + log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset) + + // cleanup the log + log.delete() + } } + /** + * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the + * setting and checking that an exception is thrown. + */ @Test def testMessageSizeCheck() { - val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes())) - val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes())) + val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes)) + val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes)) // append messages to log val maxMessageSize = second.sizeInBytes - 1 @@ -259,10 +298,13 @@ class LogTest extends JUnitSuite { log.append(second) fail("Second message set should throw MessageSizeTooLargeException.") } catch { - case e:MessageSizeTooLargeException => // this is good + case e: MessageSizeTooLargeException => // this is good } } + /** + * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly. + */ @Test def testLogRecoversToCorrectOffset() { val numMessages = 100 @@ -273,25 +315,53 @@ class LogTest extends JUnitSuite { for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize))) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) - val lastIndexOffset = log.segments.view.last.index.lastOffset - val numIndexEntries = log.segments.view.last.index.entries + val lastIndexOffset = log.activeSegment.index.lastOffset + val numIndexEntries = log.activeSegment.index.entries log.close() // test non-recovery case log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) - assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) - assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries) + assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) + assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) log.close() - // test + // test recovery case log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) - assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) - assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries) + assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) + assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) + log.close() + } + + /** + * Test that if we manually delete an index segment it is rebuilt when the log is re-opened + */ + @Test + def testIndexRebuild() { + // publish the messages and close the log + val numMessages = 200 + var log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096) + for(i <- 0 until numMessages) + log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) + val indexFiles = log.logSegments.map(_.index.file) + log.close() + + // delete all the index files + indexFiles.foreach(_.delete()) + + // reopen the log + log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096) + + assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) + for(i <- 0 until numMessages) + assertEquals(i, log.read(i, 100, None).head.offset) log.close() } + /** + * Test the Log truncate operations + */ @Test def testTruncateTo() { val set = TestUtils.singleMessageSet("test".getBytes()) @@ -329,7 +399,7 @@ class LogTest extends JUnitSuite { assertEquals("Should be back to original offset", log.logEndOffset, lastOffset) assertEquals("Should be back to original size", log.size, size) - log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1)) + log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1)) assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1)) assertEquals("Should change log size", log.size, 0) @@ -343,6 +413,9 @@ class LogTest extends JUnitSuite { assertEquals("Should change log size", log.size, 0) } + /** + * Verify that when we truncate a log the index of the last segment is resized to the max index size to allow more appends + */ @Test def testIndexResizingAtTruncation() { val set = TestUtils.singleMessageSet("test".getBytes()) @@ -357,48 +430,25 @@ class LogTest extends JUnitSuite { for (i<- 1 to msgPerSeg) log.append(set) assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments) - assertEquals("The index of the first segment should be trim to empty", 0, log.segments.view(0).index.maxEntries) + assertEquals("The index of the first segment should be trim to empty", 0, log.logSegments.toList(0).index.maxEntries) log.truncateTo(0) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) - assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.segments.view(0).index.maxEntries) + assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries) for (i<- 1 to msgPerSeg) log.append(set) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) } - - @Test - def testAppendWithoutOffsetAssignment() { - for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) { - logDir.mkdir() - var log = new Log(logDir, - maxLogFileSize = 64*1024, - maxMessageSize = config.maxMessageSize, - maxIndexSize = 1000, - indexIntervalBytes = 10000, - needsRecovery = true) - val messages = List("one", "two", "three", "four", "five", "six") - val ms = new ByteBufferMessageSet(compressionCodec = codec, - offsetCounter = new AtomicLong(5), - messages = messages.map(s => new Message(s.getBytes)):_*) - val firstOffset = ms.shallowIterator.toList.head.offset - val lastOffset = ms.shallowIterator.toList.last.offset - val (first, last) = log.append(ms, assignOffsets = false) - assertEquals(last + 1, log.logEndOffset) - assertEquals(firstOffset, first) - assertEquals(lastOffset, last) - assertTrue(log.read(5, 64*1024).size > 0) - log.delete() - } - } - + /** + * Verify that truncation works correctly after re-opening the log + */ @Test def testReopenThenTruncate() { val set = TestUtils.singleMessageSet("test".getBytes()) // create a log var log = new Log(logDir, - maxLogFileSize = set.sizeInBytes * 5, + maxSegmentSize = set.sizeInBytes * 5, maxMessageSize = config.maxMessageSize, maxIndexSize = 1000, indexIntervalBytes = 10000, @@ -409,7 +459,7 @@ class LogTest extends JUnitSuite { log.append(set) log.close() log = new Log(logDir, - maxLogFileSize = set.sizeInBytes * 5, + maxSegmentSize = set.sizeInBytes * 5, maxMessageSize = config.maxMessageSize, maxIndexSize = 1000, indexIntervalBytes = 10000, @@ -419,24 +469,4 @@ class LogTest extends JUnitSuite { assertEquals("Log end offset should be 3.", 3, log.logEndOffset) } - def assertContains(ranges: Array[Range], offset: Long) = { - Log.findRange(ranges, offset) match { - case Some(range) => - assertTrue(range + " does not contain " + offset, range.contains(offset)) - case None => fail("No range found, but expected to find " + offset) - } - } - - class SimpleRange(val start: Long, val size: Long) extends Range - - def makeRanges(breaks: Int*): Array[Range] = { - val list = new ArrayList[Range] - var prior = 0 - for(brk <- breaks) { - list.add(new SimpleRange(prior, brk - prior)) - prior = brk - } - list.toArray(new Array[Range](list.size)) - } - } diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala index ef74ba8f213..6db245c956d 100644 --- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala @@ -26,7 +26,7 @@ import org.junit.Test trait BaseMessageSetTestCases extends JUnitSuite { - val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes())) + val messages = Array(new Message("abcd".getBytes), new Message("efgh".getBytes), new Message("ijkl".getBytes)) def createMessageSet(messages: Seq[Message]): MessageSet diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 19f4c3b2421..7d77eb66fcc 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -397,6 +397,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("producer.num.retries", 3.toString) val config = new ProducerConfig(props) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 3aae5cefa3c..95ecacc0ae0 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -40,7 +40,7 @@ class SimpleFetchTest extends JUnit3Suite { val partitionId = 0 /** - * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has + * The scenario for this test is that there is one topic, "test-topic", one broker "0" that has * one partition with one follower replica on broker "1". The leader replica on "0" * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync @@ -62,6 +62,7 @@ class SimpleFetchTest extends JUnit3Suite { val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() + EasyMock.expect(log) EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages)) EasyMock.replay(log) @@ -102,33 +103,6 @@ class SimpleFetchTest extends JUnit3Suite { // make sure the log only reads bytes between 0->HW (5) EasyMock.verify(log) - - // Test offset request from non-replica - val topicAndPartition = TopicAndPartition(topic, partition.partitionId) - val offsetRequest = OffsetRequest( - Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) - val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest) - - EasyMock.reset(logManager) - EasyMock.reset(replicaManager) - - EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get) - EasyMock.expect(replicaManager.logManager).andReturn(logManager) - EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo)) - - EasyMock.replay(replicaManager) - EasyMock.replay(logManager) - - apis.handleOffsetRequest(new RequestChannel.Request(processor = 0, - requestKey = 5, - buffer = offsetRequestBB, - startTimeMs = 1)) - val offsetResponseBuffer = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer - val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer) - EasyMock.verify(replicaManager) - EasyMock.verify(logManager) - assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size) - assertEquals(hw.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head) } /** @@ -203,34 +177,6 @@ class SimpleFetchTest extends JUnit3Suite { * an offset of 15 */ EasyMock.verify(log) - - // Test offset request from replica - val topicAndPartition = TopicAndPartition(topic, partition.partitionId) - val offsetRequest = OffsetRequest( - Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)), - replicaId = followerReplicaId) - val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest) - - EasyMock.reset(logManager) - EasyMock.reset(replicaManager) - - EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get) - EasyMock.expect(replicaManager.logManager).andReturn(logManager) - EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo)) - - EasyMock.replay(replicaManager) - EasyMock.replay(logManager) - - apis.handleOffsetRequest(new RequestChannel.Request(processor = 1, - requestKey = 5, - buffer = offsetRequestBB, - startTimeMs = 1)) - val offsetResponseBuffer = requestChannel.receiveResponse(1).responseSend.asInstanceOf[BoundedByteBufferSend].buffer - val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer) - EasyMock.verify(replicaManager) - EasyMock.verify(logManager) - assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size) - assertEquals(leo.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head) } private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3d4f3f2d4c4..6863565119b 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -127,7 +127,6 @@ object TestUtils extends Logging { props.put("hostname", "localhost") props.put("port", port.toString) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) - props.put("log.flush.interval", "1") props.put("zk.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") props diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index 6763357f027..d883bdeee1f 100644 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -33,7 +33,8 @@ class EmbeddedZookeeper(val connectString: String) { factory.startup(zookeeper) def shutdown() { - factory.shutdown() + Utils.swallow(zookeeper.shutdown()) + Utils.swallow(factory.shutdown()) Utils.rm(logDir) Utils.rm(snapshotDir) } diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 63e528f6e71..4e25b926d32 100644 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -19,7 +19,7 @@ package kafka.zk import org.scalatest.junit.JUnit3Suite import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZKStringSerializer, TestZKUtils} +import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils} trait ZooKeeperTestHarness extends JUnit3Suite { val zkConnect: String = TestZKUtils.zookeeperConnect @@ -36,8 +36,8 @@ trait ZooKeeperTestHarness extends JUnit3Suite { override def tearDown() { super.tearDown - zkClient.close() - zookeeper.shutdown() + Utils.swallow(zkClient.close()) + Utils.swallow(zookeeper.shutdown()) } } diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 2b875600ab0..a1d1ca237df 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -25,7 +25,6 @@ import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.Message; public class Consumer extends Thread