KAFKA-521 Refactor the log subsystem. Patch reviewed by Neha.

git-svn-id: https://svn.apache.org/repos/asf/kafka/trunk@1416253 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Edward Jay Kreps 2012-12-02 20:50:01 +00:00
parent 45ba1019cb
commit 4be0b1be29
26 changed files with 1046 additions and 788 deletions

View File

@ -29,54 +29,83 @@ import java.util.concurrent.TimeUnit
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
/**
* An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
* will fail on an immutable message set. An optional limit and start position can be applied to the message set
* which will control the position in the file at which the set begins.
* An on-disk message set. An optional start and end position can be applied to the message set
* which will allow slicing a subset of the file.
* @param file The file name for the underlying log data
* @param channel the underlying file channel used
* @param start A lower bound on the absolute position in the file from which the message set begins
* @param end The upper bound on the absolute position in the file at which the message set ends
* @param isSlice Should the start and end parameters be used for slicing?
*/
@nonthreadsafe
class FileMessageSet private[kafka](val file: File,
private[log] val channel: FileChannel,
private[log] val start: Int = 0,
private[log] val limit: Int = Int.MaxValue,
initChannelPositionToEnd: Boolean = true) extends MessageSet with Logging {
private[log] val start: Int,
private[log] val end: Int,
isSlice: Boolean) extends MessageSet with Logging {
/* the size of the message set in bytes */
private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start)
private val _size =
if(isSlice)
new AtomicInteger(end - start) // don't check the file size if this is just a slice view
else
new AtomicInteger(math.min(channel.size().toInt, end) - start)
if (initChannelPositionToEnd) {
/* set the file position to the last byte in the file */
/* if this is not a slice, update the file pointer to the end of the file */
if (!isSlice)
channel.position(channel.size)
}
/**
* Create a file message set with no limit or offset
* Create a file message set with no slicing.
*/
def this(file: File, channel: FileChannel) = this(file, channel, 0, Int.MaxValue)
def this(file: File, channel: FileChannel) =
this(file, channel, start = 0, end = Int.MaxValue, isSlice = false)
/**
* Create a file message set with no limit or offset
* Create a file message set with no slicing
*/
def this(file: File) = this(file, Utils.openChannel(file, mutable = true))
def this(file: File) =
this(file, Utils.openChannel(file, mutable = true))
/**
* Create a slice view of the file message set that begins and ends at the given byte offsets
*/
def this(file: File, channel: FileChannel, start: Int, end: Int) =
this(file, channel, start, end, isSlice = true)
/**
* Return a message set which is a view into this set starting from the given position and with the given size limit.
*
* If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read.
*
* If this message set is already sliced, the position will be taken relative to that slicing.
*
* @param position The start position to begin the read from
* @param size The number of bytes after the start position to include
*
* @return A sliced wrapper on this message set limited based on the given position and size
*/
def read(position: Int, size: Int): FileMessageSet = {
if(position < 0)
throw new IllegalArgumentException("Invalid position: " + position)
if(size < 0)
throw new IllegalArgumentException("Invalid size: " + size)
new FileMessageSet(file,
channel,
this.start + position,
scala.math.min(this.start + position + size, sizeInBytes()),
false)
start = this.start + position,
end = math.min(this.start + position + size, sizeInBytes()))
}
/**
* Search forward for the file position of the last offset that is great than or equal to the target offset
* and return its physical position. If no such offsets are found, return null.
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
*/
private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
var position = startingPosition
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
val size = _size.get()
val size = sizeInBytes()
while(position + MessageSet.LogOverhead < size) {
buffer.rewind()
channel.read(buffer, position)
@ -94,19 +123,34 @@ class FileMessageSet private[kafka](val file: File,
}
/**
* Write some of this set to the given channel, return the amount written
* Write some of this set to the given channel.
* @param destChannel The channel to write to.
* @param writePosition The position in the message set to begin writing from.
* @param size The maximum number of bytes to write
* @return The number of bytes actually written.
*/
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int =
channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt
/**
* Get an iterator over the messages in the set. We only do shallow iteration here.
* Get a shallow iterator over the messages in the set.
*/
override def iterator: Iterator[MessageAndOffset] = {
override def iterator() = iterator(Int.MaxValue)
/**
* Get an iterator over the messages in the set. We only do shallow iteration here.
* @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory.
* If we encounter a message larger than this we throw an InvalidMessageException.
* @return The iterator.
*/
def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
new IteratorTemplate[MessageAndOffset] {
var location = start
override def makeNext(): MessageAndOffset = {
if(location >= end)
return allDone()
// read the size of the item
val sizeOffsetBuffer = ByteBuffer.allocate(12)
channel.read(sizeOffsetBuffer, location)
@ -116,8 +160,10 @@ class FileMessageSet private[kafka](val file: File,
sizeOffsetBuffer.rewind()
val offset = sizeOffsetBuffer.getLong()
val size = sizeOffsetBuffer.getInt()
if (size < Message.MinHeaderSize)
if(size < Message.MinHeaderSize)
return allDone()
if(size > maxMessageSize)
throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
// read the item itself
val buffer = ByteBuffer.allocate(size)
@ -139,7 +185,7 @@ class FileMessageSet private[kafka](val file: File,
def sizeInBytes(): Int = _size.get()
/**
* Append this message to the message set
* Append these messages to the message set
*/
def append(messages: ByteBufferMessageSet) {
val written = messages.writeTo(channel, 0, messages.sizeInBytes)
@ -150,9 +196,7 @@ class FileMessageSet private[kafka](val file: File,
* Commit all written data to the physical disk
*/
def flush() = {
LogFlushStats.logFlushTimer.time {
channel.force(true)
}
channel.force(true)
}
/**
@ -165,6 +209,7 @@ class FileMessageSet private[kafka](val file: File,
/**
* Delete this message set from the filesystem
* @return True iff this message set was deleted.
*/
def delete(): Boolean = {
Utils.swallow(channel.close())
@ -172,13 +217,14 @@ class FileMessageSet private[kafka](val file: File,
}
/**
* Truncate this file message set to the given size. Note that this API does no checking that the
* given size falls on a valid byte offset.
* Truncate this file message set to the given size in bytes. Note that this API does no checking that the
* given size falls on a valid message boundary.
* @param targetSize The size to truncate to.
*/
def truncateTo(targetSize: Int) = {
if(targetSize > sizeInBytes())
throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) +
" size of this log segment is only %d bytes".format(sizeInBytes()))
if(targetSize > sizeInBytes || targetSize < 0)
throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
" size of this log segment is " + sizeInBytes + " bytes.")
channel.truncate(targetSize)
channel.position(targetSize)
_size.set(targetSize)

View File

@ -17,76 +17,17 @@
package kafka.log
import kafka.api.OffsetRequest
import java.io.{IOException, File}
import java.util.{Comparator, Collections, ArrayList}
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
import java.util.concurrent.atomic._
import kafka.utils._
import scala.math._
import scala.collection.JavaConversions.asIterable;
import java.text.NumberFormat
import kafka.server.BrokerTopicStat
import kafka.message._
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
object Log {
val LogFileSuffix = ".log"
val IndexFileSuffix = ".index"
/**
* Search for the greatest range with start <= the target value.
*/
def findRange[T <: Range](ranges: Array[T], value: Long, arraySize: Int): Option[T] = {
if(ranges.size < 1)
return None
// check out of bounds
if(value < ranges(0).start)
return None
var low = 0
var high = arraySize - 1
while(low < high) {
val mid = ceil((high + low) / 2.0).toInt
val found = ranges(mid)
if(found.start == value)
return Some(found)
else if (value < found.start)
high = mid - 1
else
low = mid
}
Some(ranges(low))
}
def findRange[T <: Range](ranges: Array[T], value: Long): Option[T] =
findRange(ranges, value, ranges.length)
/**
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
* so that ls sorts the files numerically
*/
def filenamePrefixFromOffset(offset: Long): String = {
val nf = NumberFormat.getInstance()
nf.setMinimumIntegerDigits(20)
nf.setMaximumFractionDigits(0)
nf.setGroupingUsed(false)
nf.format(offset)
}
def logFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
def indexFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
def getEmptyOffsets(timestamp: Long): Seq[Long] =
if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
Seq(0L)
else Nil
}
/**
* An append-only log for storing messages.
@ -96,19 +37,26 @@ object Log {
* New log segments are created according to a configurable policy that controls the size in bytes or time interval
* for a given segment.
*
* @param dir The directory in which log segments are created.
* @param maxSegmentSize The maximum segment size in bytes.
* @param maxMessageSize The maximum message size in bytes (including headers) that will be allowed in this log.
* @param flushInterval The number of messages that can be appended to this log before we force a flush of the log.
* @param rollIntervalMs The time after which we will force the rolling of a new log segment
* @param needsRecovery Should we run recovery on this log when opening it? This should be done if the log wasn't cleanly shut down.
* @param maxIndexSize The maximum size of an offset index in this log. The index of the active log segment will be pre-allocated to this size.
* @param indexIntervalBytes The (approximate) number of bytes between entries in the offset index for this log.
*
*/
@threadsafe
private[kafka] class Log(val dir: File,
val maxLogFileSize: Int,
val maxMessageSize: Int,
val flushInterval: Int = Int.MaxValue,
val rollIntervalMs: Long = Long.MaxValue,
val needsRecovery: Boolean,
val maxIndexSize: Int = (10*1024*1024),
val indexIntervalBytes: Int = 4096,
time: Time = SystemTime,
brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Kafka Log on Broker " + brokerId + "], "
class Log(val dir: File,
val maxSegmentSize: Int,
val maxMessageSize: Int,
val flushInterval: Int = Int.MaxValue,
val rollIntervalMs: Long = Long.MaxValue,
val needsRecovery: Boolean,
val maxIndexSize: Int = (10*1024*1024),
val indexIntervalBytes: Int = 4096,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
@ -119,13 +67,13 @@ private[kafka] class Log(val dir: File,
private val unflushed = new AtomicInteger(0)
/* last time it was flushed */
private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
private val lastflushedTime = new AtomicLong(time.milliseconds)
/* the actual segments of the log */
private[log] val segments: SegmentList[LogSegment] = loadSegments()
private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments()
/* Calculate the offset of the next message */
private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
newGauge(name + "-" + "NumLogSegments",
new Gauge[Int] { def getValue = numberOfSegments })
@ -133,13 +81,13 @@ private[kafka] class Log(val dir: File,
newGauge(name + "-" + "LogEndOffset",
new Gauge[Long] { def getValue = logEndOffset })
/* The name of this log */
/** The name of this log */
def name = dir.getName()
/* Load the log segments from the log files on disk */
private def loadSegments(): SegmentList[LogSegment] = {
private def loadSegments(): ConcurrentNavigableMap[Long, LogSegment] = {
// open all the segments read-only
val logSegments = new ArrayList[LogSegment]
val logSegments = new ConcurrentSkipListMap[Long, LogSegment]
val ls = dir.listFiles()
if(ls != null) {
for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix)) {
@ -147,75 +95,46 @@ private[kafka] class Log(val dir: File,
throw new IOException("Could not read file " + file)
val filename = file.getName()
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
// TODO: we should ideally rebuild any missing index files, instead of erroring out
if(!Log.indexFilename(dir, start).exists)
throw new IllegalStateException("Found log file with no corresponding index file.")
logSegments.add(new LogSegment(dir = dir,
startOffset = start,
indexIntervalBytes = indexIntervalBytes,
maxIndexSize = maxIndexSize))
val hasIndex = Log.indexFilename(dir, start).exists
val segment = new LogSegment(dir = dir,
startOffset = start,
indexIntervalBytes = indexIntervalBytes,
maxIndexSize = maxIndexSize)
if(!hasIndex) {
// this can only happen if someone manually deletes the index file
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
segment.recover(maxMessageSize)
}
logSegments.put(start, segment)
}
}
if(logSegments.size == 0) {
// no existing segments, create a new mutable segment
logSegments.add(new LogSegment(dir = dir,
// no existing segments, create a new mutable segment beginning at offset 0
logSegments.put(0,
new LogSegment(dir = dir,
startOffset = 0,
indexIntervalBytes = indexIntervalBytes,
maxIndexSize = maxIndexSize))
} else {
// there is at least one existing segment, validate and recover them/it
// sort segments into ascending order for fast searching
Collections.sort(logSegments, new Comparator[LogSegment] {
def compare(s1: LogSegment, s2: LogSegment): Int = {
if(s1.start == s2.start) 0
else if(s1.start < s2.start) -1
else 1
}
})
// reset the index size of the currently active log segment to allow more entries
val active = logSegments.lastEntry.getValue
active.index.resize(maxIndexSize)
// reset the index size of the last (current active) log segment to its maximum value
logSegments.get(logSegments.size() - 1).index.resize(maxIndexSize)
// run recovery on the last segment if necessary
if(needsRecovery)
recoverSegment(logSegments.get(logSegments.size - 1))
}
new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
}
/**
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log.
*/
private def recoverSegment(segment: LogSegment) {
segment.index.truncate()
var validBytes = 0
var lastIndexEntry = 0
val iter = segment.messageSet.iterator
try {
while(iter.hasNext) {
val entry = iter.next
entry.message.ensureValid()
if(validBytes - lastIndexEntry > indexIntervalBytes) {
segment.index.append(entry.offset, validBytes)
lastIndexEntry = validBytes
}
validBytes += MessageSet.entrySize(entry.message)
// run recovery on the active segment if necessary
if(needsRecovery) {
info("Recovering active segment of %s.".format(name))
active.recover(maxMessageSize)
}
} catch {
case e: InvalidMessageException =>
logger.warn("Found invalid messages in log " + name)
}
val truncated = segment.messageSet.sizeInBytes - validBytes
if(truncated > 0)
warn("Truncated " + truncated + " invalid bytes from the log " + name + ".")
segment.messageSet.truncateTo(validBytes)
logSegments
}
/**
* The number of segments in the log
* The number of segments in the log.
* Take care! this is an O(n) operation.
*/
def numberOfSegments: Int = segments.view.length
def numberOfSegments: Int = segments.size
/**
* Close this log
@ -223,7 +142,7 @@ private[kafka] class Log(val dir: File,
def close() {
debug("Closing log " + name)
lock synchronized {
for(seg <- segments.view)
for(seg <- logSegments)
seg.close()
}
}
@ -234,78 +153,81 @@ private[kafka] class Log(val dir: File,
* This method will generally be responsible for assigning offsets to the messages,
* however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
*
* Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set,
* or (-1,-1) if the message set is empty
* @param messages The message set to append
* @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
*
* @throws KafkaStorageException If the append fails due to an I/O error.
*
* @return Information about the appended messages including the first and last offset
*/
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = {
val messageSetInfo = analyzeAndValidateMessageSet(messages)
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
val appendInfo = analyzeAndValidateMessageSet(messages)
// if we have any valid messages, append them to the log
if(messageSetInfo.count == 0) {
(-1L, -1L)
} else {
BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageSetInfo.count)
BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageSetInfo.count)
if(appendInfo.count == 0)
return appendInfo
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validMessages = trimInvalidBytes(messages)
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validMessages = trimInvalidBytes(messages)
try {
// they are valid, insert them in the log
val offsets = lock synchronized {
// maybe roll the log if this segment is full
val segment = maybeRoll(segments.view.last)
try {
// they are valid, insert them in the log
lock synchronized {
// maybe roll the log if this segment is full
val segment = maybeRoll()
// assign offsets to the messageset
val offsets =
if(assignOffsets) {
val firstOffset = nextOffset.get
validMessages = validMessages.assignOffsets(nextOffset, messageSetInfo.codec)
val lastOffset = nextOffset.get - 1
(firstOffset, lastOffset)
} else {
if(!messageSetInfo.offsetsMonotonic)
throw new IllegalArgumentException("Out of order offsets found in " + messages)
nextOffset.set(messageSetInfo.lastOffset + 1)
(messageSetInfo.firstOffset, messageSetInfo.lastOffset)
}
// now append to the log
trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s"
.format(this.name, offsets._1, nextOffset.get(), validMessages))
segment.append(offsets._1, validMessages)
// return the offset at which the messages were appended
offsets
if(assignOffsets) {
// assign offsets to the messageset
appendInfo.firstOffset = nextOffset.get
validMessages = validMessages.assignOffsets(nextOffset, appendInfo.codec)
appendInfo.lastOffset = nextOffset.get - 1
} else {
// we are taking the offsets we are given
if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
throw new IllegalArgumentException("Out of order offsets found in " + messages)
nextOffset.set(appendInfo.lastOffset + 1)
}
// now append to the log
trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset))
segment.append(appendInfo.firstOffset, validMessages)
// maybe flush the log and index
maybeFlush(messageSetInfo.count)
maybeFlush(appendInfo.count)
// return the first and last offset
offsets
} catch {
case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
appendInfo
}
} catch {
case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
}
}
/* struct to hold various quantities we compute about each message set before appending to the log */
case class MessageSetAppendInfo(firstOffset: Long, lastOffset: Long, codec: CompressionCodec, count: Int, offsetsMonotonic: Boolean)
/** Struct to hold various quantities we compute about each message set before appending to the log
* @param firstOffset The first offset in the message set
* @param lastOffset The last offset in the message set
* @param codec The codec used in the message set
* @param count The number of messages
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
*/
case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, count: Int, offsetsMonotonic: Boolean)
/**
* Validate the following:
* 1. each message is not too large
* 2. each message matches its CRC
* <ol>
* <li> each message is not too large
* <li> each message matches its CRC
* </ol>
*
* Also compute the following quantities:
* 1. First offset in the message set
* 2. Last offset in the message set
* 3. Number of messages
* 4. Whether the offsets are monotonically increasing
* 5. Whether any compression codec is used (if many are used, then the last one is given)
* <ol>
* <li> First offset in the message set
* <li> Last offset in the message set
* <li> Number of messages
* <li> Whether the offsets are monotonically increasing
* <li> Whether any compression codec is used (if many are used, then the last one is given)
* </ol>
*/
private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): MessageSetAppendInfo = {
private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {
var messageCount = 0
var firstOffset, lastOffset = -1L
var codec: CompressionCodec = NoCompressionCodec
@ -332,11 +254,13 @@ private[kafka] class Log(val dir: File,
if(messageCodec != NoCompressionCodec)
codec = messageCodec
}
MessageSetAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic)
LogAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic)
}
/**
* Trim any invalid bytes from the end of this message set (if there are any)
* @param messages The message set to trim
* @return A trimmed message set. This may be the same as what was passed in or it may not.
*/
private def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = {
val messageSetValidBytes = messages.validBytes
@ -353,118 +277,131 @@ private[kafka] class Log(val dir: File,
}
/**
* Read a message set from the log.
* startOffset - The logical offset to begin reading at
* maxLength - The maximum number of bytes to read
* maxOffset - The maximum logical offset to include in the resulting message set
* Read messages from the log
* @param startOffset The offset to begin reading at
* @param maxLength The maximum number of bytes to read
* @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set).
*
* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
* @return The messages read
*/
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
val view = segments.view
// check if the offset is valid and in range
val first = view.head.start
val next = nextOffset.get
if(startOffset == next)
return MessageSet.Empty
else if(startOffset > next || startOffset < first)
throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, first, next))
// Do the read on the segment with a base offset less than the target offset
// TODO: to handle sparse offsets, we need to skip to the next segment if this read doesn't find anything
Log.findRange(view, startOffset, view.length) match {
case None => throw new OffsetOutOfRangeException("Offset is earlier than the earliest offset")
case Some(segment) => segment.read(startOffset, maxLength, maxOffset)
var entry = segments.floorEntry(startOffset)
// attempt to read beyond the log end offset is an error
if(startOffset > next || entry == null)
throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
// do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
while(entry != null) {
val messages = entry.getValue.read(startOffset, maxOffset, maxLength)
if(messages == null)
entry = segments.higherEntry(entry.getKey)
else
return messages
}
// okay we are beyond the end of the last segment but less than the log end offset
MessageSet.Empty
}
/**
* Delete any log segments matching the given predicate function
* Delete any log segments matching the given predicate function,
* starting with the oldest segment and moving forward until a segment doesn't match.
* @param predicate A function that takes in a single log segment and returns true iff it is deletable
* @return The number of segments deleted
*/
def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = {
lock synchronized {
val view = segments.view
val deletable = view.takeWhile(predicate)
for(seg <- deletable)
seg.deleted = true
var numToDelete = deletable.size
// if we are deleting everything, create a new empty segment
if(numToDelete == view.size) {
if (view(numToDelete - 1).size > 0)
def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
// find any segments that match the user-supplied predicate UNLESS it is the final segment
// and it is empty (since we would just end up re-creating it
val lastSegment = activeSegment
var deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
val numToDelete = deletable.size
if(numToDelete > 0) {
lock synchronized {
// we must always have at least one segment, so if we are going to delete all the segments, create a new one first
if(segments.size == numToDelete)
roll()
else {
// If the last segment to be deleted is empty and we roll the log, the new segment will have the same
// file name. So simply reuse the last segment and reset the modified time.
view(numToDelete - 1).messageSet.file.setLastModified(time.milliseconds)
numToDelete -=1
}
// remove the segments for lookups
deletable.foreach(d => segments.remove(d.baseOffset))
}
segments.trunc(numToDelete)
// do not lock around actual file deletion, it isn't O(1) on many filesystems
deletable.foreach(_.delete())
}
numToDelete
}
/**
* Get the size of the log in bytes
* The size of the log in bytes
*/
def size: Long = segments.view.foldLeft(0L)(_ + _.size)
def size: Long = logSegments.map(_.size).sum
/**
* Get the offset of the next message that will be appended
* The offset of the next message that will be appended to the log
*/
def logEndOffset: Long = nextOffset.get
/**
* Roll the log over if necessary
* Roll the log over to a new empty log segment if necessary
* @return The currently active segment after (perhaps) rolling to a new segment
*/
private def maybeRoll(segment: LogSegment): LogSegment = {
if ((segment.messageSet.sizeInBytes > maxLogFileSize) ||
((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) ||
private def maybeRoll(): LogSegment = {
val segment = activeSegment
if ((segment.size > maxSegmentSize) ||
(segment.size > 0 && time.milliseconds - segment.created > rollIntervalMs) ||
segment.index.isFull)
roll()
else
segment
}
/**
* Create a new segment and make it active, and return it
* Roll the log over to a new active segment starting with the current logEndOffset.
* This will trim the index to the exact size of the number of entries it currently contains.
* @return The newly rolled segment
*/
def roll(): LogSegment = {
lock synchronized {
flush()
rollToOffset(logEndOffset)
}
}
// flush the log to ensure that only the active segment needs to be recovered
if(!segments.isEmpty())
flush()
/**
* Roll the log over to the given new offset value
*/
private def rollToOffset(newOffset: Long): LogSegment = {
val logFile = logFilename(dir, newOffset)
val indexFile = indexFilename(dir, newOffset)
for(file <- List(logFile, indexFile); if file.exists) {
warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
file.delete()
val newOffset = logEndOffset
val logFile = logFilename(dir, newOffset)
val indexFile = indexFilename(dir, newOffset)
for(file <- List(logFile, indexFile); if file.exists) {
warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
file.delete()
}
debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
segments.lastEntry() match {
case null =>
case entry => entry.getValue.index.trimToValidSize()
}
val segment = new LogSegment(dir,
startOffset = newOffset,
indexIntervalBytes = indexIntervalBytes,
maxIndexSize = maxIndexSize)
val prev = segments.put(segment.baseOffset, segment)
if(prev != null)
throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset))
segment
}
debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
segments.view.lastOption match {
case Some(segment) => segment.index.trimToValidSize()
case None =>
}
val segmentsView = segments.view
if(segmentsView.size > 0 && segmentsView.last.start == newOffset)
throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset))
val segment = new LogSegment(dir,
startOffset = newOffset,
indexIntervalBytes = indexIntervalBytes,
maxIndexSize = maxIndexSize)
segments.append(segment)
segment
}
/**
* Flush the log if necessary
* @param numberOfMessages The number of messages that are being appended
*/
private def maybeFlush(numberOfMessages : Int) {
if(unflushed.addAndGet(numberOfMessages) >= flushInterval)
@ -472,144 +409,128 @@ private[kafka] class Log(val dir: File,
}
/**
* Flush this log file to the physical disk
* Flush this log file and assoicated index to the physical disk
*/
def flush() : Unit = {
if (unflushed.get == 0)
return
debug("Flushing log '" + name + "' last flushed: " + lastFlushTime + " current time: " +
time.milliseconds + " unflushed = " + unflushed.get)
lock synchronized {
debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
time.milliseconds)
segments.view.last.flush()
activeSegment.flush()
unflushed.set(0)
lastflushedTime.set(time.milliseconds)
}
}
def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
val segsArray = segments.view
var offsetTimeArray: Array[(Long, Long)] = null
if(segsArray.last.size > 0)
offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
else
offsetTimeArray = new Array[(Long, Long)](segsArray.length)
for(i <- 0 until segsArray.length)
offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified)
if(segsArray.last.size > 0)
offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds)
var startIndex = -1
timestamp match {
case OffsetRequest.LatestTime =>
startIndex = offsetTimeArray.length - 1
case OffsetRequest.EarliestTime =>
startIndex = 0
case _ =>
var isFound = false
debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
startIndex = offsetTimeArray.length - 1
while (startIndex >= 0 && !isFound) {
if (offsetTimeArray(startIndex)._2 <= timestamp)
isFound = true
else
startIndex -=1
}
}
val retSize = maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
for(j <- 0 until retSize) {
ret(j) = offsetTimeArray(startIndex)._1
startIndex -= 1
}
// ensure that the returned seq is in descending order of offsets
ret.toSeq.sortBy(- _)
}
/**
* Delete this log from the filesystem entirely
*/
def delete(): Unit = {
deleteSegments(segments.contents.get())
logSegments.foreach(_.delete())
Utils.rm(dir)
}
/* Attempts to delete all provided segments from a log and returns how many it was able to */
def deleteSegments(segments: Seq[LogSegment]): Int = {
var total = 0
for(segment <- segments) {
info("Deleting log segment " + segment.start + " from " + name)
val deletedLog = segment.messageSet.delete()
val deletedIndex = segment.index.delete()
if(!deletedIndex || !deletedLog) {
throw new KafkaStorageException("Deleting log segment " + segment.start + " failed.")
} else {
total += 1
}
}
total
}
/**
* Truncate this log so that it ends with the greatest offset < targetOffset.
* @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
*/
def truncateTo(targetOffset: Long) {
info("Truncating log %s to offset %d.".format(name, targetOffset))
if(targetOffset < 0)
throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
if(targetOffset > logEndOffset) {
info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1))
return
}
lock synchronized {
val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
val viewSize = segments.view.size
val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
/* We should not hit this error because segments.view is locked in markedDeletedWhile() */
if(numSegmentsDeleted != segmentsToBeDeleted.size)
error("Failed to delete some segments when attempting to truncate to offset " + targetOffset +")")
if (numSegmentsDeleted == viewSize) {
segments.trunc(segments.view.size)
rollToOffset(targetOffset)
this.nextOffset.set(targetOffset)
if(segments.firstEntry.getValue.baseOffset > targetOffset) {
truncateFullyAndStartAt(targetOffset)
} else {
if(targetOffset > logEndOffset) {
error("Target offset %d cannot be greater than the last message offset %d in the log %s".
format(targetOffset, logEndOffset, segments.view.last.messageSet.file.getAbsolutePath))
} else {
// find the log segment that has this hw
val segmentToBeTruncated = findRange(segments.view, targetOffset)
segmentToBeTruncated match {
case Some(segment) =>
val truncatedSegmentIndex = segments.view.indexOf(segment)
segments.truncLast(truncatedSegmentIndex)
segment.truncateTo(targetOffset)
this.nextOffset.set(targetOffset)
info("Truncated log segment %s to target offset %d".format(segments.view.last.messageSet.file.getAbsolutePath, targetOffset))
case None => // nothing to do
}
}
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
deletable.foreach(s => segments.remove(s.baseOffset))
deletable.foreach(_.delete())
activeSegment.truncateTo(targetOffset)
this.nextOffset.set(targetOffset)
}
}
}
/**
* Truncate all segments in the log and start a new segment on a new offset
/**
* Delete all data in the log and start at the new offset
* @param newOffset The new offset to start the log with
*/
def truncateAndStartWithNewOffset(newOffset: Long) {
def truncateFullyAndStartAt(newOffset: Long) {
debug("Truncate and start log '" + name + "' to " + newOffset)
lock synchronized {
val deletedSegments = segments.trunc(segments.view.size)
debug("Truncate and start log '" + name + "' to " + newOffset)
segments.append(new LogSegment(dir,
newOffset,
indexIntervalBytes = indexIntervalBytes,
maxIndexSize = maxIndexSize))
deleteSegments(deletedSegments)
val segmentsToDelete = logSegments.toList
segments.clear()
segmentsToDelete.foreach(_.delete())
segments.put(newOffset,
new LogSegment(dir,
newOffset,
indexIntervalBytes = indexIntervalBytes,
maxIndexSize = maxIndexSize))
this.nextOffset.set(newOffset)
}
}
def topicName():String = {
name.substring(0, name.lastIndexOf("-"))
}
def getLastFlushedTime():Long = {
return lastflushedTime.get
}
/**
* The time this log is last known to have been fully flushed to disk
*/
def lastFlushTime(): Long = lastflushedTime.get
/**
* The active segment that is currently taking appends
*/
def activeSegment = segments.lastEntry.getValue
/**
* All the log segments in this log ordered from oldest to newest
*/
def logSegments: Iterable[LogSegment] = asIterable(segments.values)
override def toString() = "Log(" + this.dir + ")"
}
/**
* Helper functions for logs
*/
object Log {
val LogFileSuffix = ".log"
val IndexFileSuffix = ".index"
/**
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
* so that ls sorts the files numerically.
* @param offset The offset to use in the file name
* @return The filename
*/
def filenamePrefixFromOffset(offset: Long): String = {
val nf = NumberFormat.getInstance()
nf.setMinimumIntegerDigits(20)
nf.setMaximumFractionDigits(0)
nf.setGroupingUsed(false)
nf.format(offset)
}
/**
* Construct a log file name in the given dir with the given base offset
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
*/
def logFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
/**
* Construct an index file name in the given dir using the given base offset
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
*/
def indexFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
}

View File

@ -20,7 +20,6 @@ package kafka.log
import java.io._
import kafka.utils._
import scala.collection._
import kafka.log.Log._
import kafka.common.{TopicAndPartition, KafkaException}
import kafka.server.KafkaConfig
@ -36,9 +35,9 @@ import kafka.server.KafkaConfig
* A background thread handles log retention by periodically truncating excess log segments.
*/
@threadsafe
private[kafka] class LogManager(val config: KafkaConfig,
scheduler: KafkaScheduler,
private val time: Time) extends Logging {
class LogManager(val config: KafkaConfig,
scheduler: KafkaScheduler,
private val time: Time) extends Logging {
val CleanShutdownFile = ".kafka_cleanshutdown"
val LockFile = ".lock"
@ -62,9 +61,12 @@ private[kafka] class LogManager(val config: KafkaConfig,
loadLogs(logDirs)
/**
* 1. Ensure that there are no duplicates in the directory list
* 2. Create each directory if it doesn't exist
* 3. Check that each path is a readable directory
* Create and check validity of the given directories, specifically:
* <ol>
* <li> Ensure that there are no duplicates in the directory list
* <li> Create each directory if it doesn't exist
* <li> Check that each path is a readable directory
* </ol>
*/
private def createAndValidateLogDirs(dirs: Seq[File]) {
if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
@ -95,7 +97,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
/**
* Recovery and load all logs in the given data directories
* Recover and load all logs in the given data directories
*/
private def loadLogs(dirs: Seq[File]) {
for(dir <- dirs) {
@ -120,8 +122,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
needsRecovery,
config.logIndexMaxSizeBytes,
config.logIndexIntervalBytes,
time,
config.brokerId)
time)
val previous = this.logs.put(topicPartition, log)
if(previous != null)
throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
@ -132,7 +133,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
/**
* Start the log flush thread
* Start the background threads to flush logs and do log cleanup
*/
def startup() {
/* Schedule the cleanup task to delete old logs */
@ -140,14 +141,17 @@ private[kafka] class LogManager(val config: KafkaConfig,
info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
info("Starting log flusher every " + config.flushSchedulerThreadRate +
" ms with the following overrides " + logFlushIntervals)
scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-",
config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
" ms with the following overrides " + logFlushIntervals)
scheduler.scheduleWithRate(flushDirtyLogs,
"kafka-logflusher-",
config.flushSchedulerThreadRate,
config.flushSchedulerThreadRate,
isDaemon = false)
}
}
/**
* Get the log if it exists
* Get the log if it exists, otherwise return None
*/
def getLog(topic: String, partition: Int): Option[Log] = {
val topicAndPartiton = TopicAndPartition(topic, partition)
@ -159,7 +163,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
/**
* Create the log if it does not exist, if it exists just return it
* Create the log if it does not exist, otherwise just return it
*/
def getOrCreateLog(topic: String, partition: Int): Log = {
val topicAndPartition = TopicAndPartition(topic, partition)
@ -195,8 +199,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
needsRecovery = false,
config.logIndexMaxSizeBytes,
config.logIndexIntervalBytes,
time,
config.brokerId)
time)
info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
logs.put(topicAndPartition, log)
log
@ -223,14 +226,6 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
}
def getOffsets(topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
val log = getLog(topicAndPartition.topic, topicAndPartition.partition)
log match {
case Some(l) => l.getOffsetsBefore(timestamp, maxNumOffsets)
case None => getEmptyOffsets(timestamp)
}
}
/**
* Runs through the log removing segments older than a certain age
*/
@ -238,9 +233,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
val startMs = time.milliseconds
val topic = parseTopicPartitionName(log.name).topic
val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
val toBeDeleted = log.markDeletedWhile(startMs - _.messageSet.file.lastModified > logCleanupThresholdMs)
val total = log.deleteSegments(toBeDeleted)
total
log.deleteOldSegments(startMs - _.lastModified > logCleanupThresholdMs)
}
/**
@ -250,7 +243,8 @@ private[kafka] class LogManager(val config: KafkaConfig,
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
val topic = parseTopicPartitionName(log.dir.getName).topic
val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize)
return 0
var diff = log.size - maxLogRetentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
@ -260,9 +254,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
false
}
}
val toBeDeleted = log.markDeletedWhile( shouldDelete )
val total = log.deleteSegments(toBeDeleted)
total
log.deleteOldSegments(shouldDelete)
}
/**
@ -307,19 +299,20 @@ private[kafka] class LogManager(val config: KafkaConfig,
*/
private def flushDirtyLogs() = {
debug("Checking for dirty logs to flush...")
for (log <- allLogs) {
for ((topicAndPartition, log) <- logs) {
try {
val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
var logFlushInterval = config.defaultFlushIntervalMs
if(logFlushIntervals.contains(log.topicName))
logFlushInterval = logFlushIntervals(log.topicName)
debug(log.topicName + " flush interval " + logFlushInterval +
" last flushed " + log.getLastFlushedTime + " time since last flush: " + timeSinceLastFlush)
if(logFlushIntervals.contains(topicAndPartition.topic))
logFlushInterval = logFlushIntervals(topicAndPartition.topic)
debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + logFlushInterval +
" last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= logFlushInterval)
log.flush
} catch {
case e =>
error("Error flushing topic " + log.topicName, e)
error("Error flushing topic " + topicAndPartition.topic, e)
e match {
case _: IOException =>
fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
@ -330,11 +323,12 @@ private[kafka] class LogManager(val config: KafkaConfig,
}
}
/**
* Parse the topic and partition out of the directory name of a log
*/
private def parseTopicPartitionName(name: String): TopicAndPartition = {
val index = name.lastIndexOf('-')
TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
}
def topics(): Iterable[String] = logs.keys.map(_.topic)
}

View File

@ -3,6 +3,7 @@ package kafka.log
import scala.math._
import java.io.File
import kafka.message._
import kafka.common._
import kafka.utils._
/**
@ -12,24 +13,24 @@ import kafka.utils._
* any previous segment.
*
* A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
*
* @param log The message set containing log entries
* @param index The offset index
* @param baseOffset A lower bound on the offsets in this segment
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param time The time instance
*/
@nonthreadsafe
class LogSegment(val messageSet: FileMessageSet,
class LogSegment(val log: FileMessageSet,
val index: OffsetIndex,
val start: Long,
val baseOffset: Long,
val indexIntervalBytes: Int,
time: Time) extends Range with Logging {
time: Time) extends Logging {
var firstAppendTime: Option[Long] =
if (messageSet.sizeInBytes > 0)
Some(time.milliseconds)
else
None
var created = time.milliseconds
/* the number of bytes since we last added an entry in the offset index */
var bytesSinceLastIndexEntry = 0
@volatile var deleted = false
private var bytesSinceLastIndexEntry = 0
def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) =
this(new FileMessageSet(file = Log.logFilename(dir, startOffset)),
@ -39,49 +40,62 @@ class LogSegment(val messageSet: FileMessageSet,
SystemTime)
/* Return the size in bytes of this log segment */
def size: Long = messageSet.sizeInBytes()
def updateFirstAppendTime() {
if (firstAppendTime.isEmpty)
firstAppendTime = Some(time.milliseconds)
}
def size: Long = log.sizeInBytes()
/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock
* It is assumed this method is being called from within a lock.
*
* @param offset The first offset in the message set.
* @param messages The messages to append.
*/
@nonthreadsafe
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, messageSet.sizeInBytes()))
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
// append an entry to the index (if needed)
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(offset, messageSet.sizeInBytes())
index.append(offset, log.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
// append the messages
messageSet.append(messages)
updateFirstAppendTime()
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
/**
* Find the physical file position for the least offset >= the given offset. If no offset is found
* that meets this criteria before the end of the log, return null.
* Find the physical file position for the first message with offset >= the requested offset.
*
* The lowerBound argument is an optimization that can be used if we already know a valid starting position
* in the file higher than the greast-lower-bound from the index.
*
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
* when omitted, the search will begin at the position in the offset index.
*
* @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
*/
private def translateOffset(offset: Long): OffsetPosition = {
@threadsafe
private def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
val mapping = index.lookup(offset)
messageSet.searchFor(offset, mapping.position)
log.searchFor(offset, max(mapping.position, startingFilePosition))
}
/**
* Read a message set from this segment beginning with the first offset
* greater than or equal to the startOffset. The message set will include
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
*
* @param startOffset A lower bound on the first offset to include in the message set we read
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
*
* @return The message set read or null if the startOffset is larger than the largest offset in this log.
*/
def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = {
@threadsafe
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = {
if(maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
if(maxSize == 0)
@ -89,9 +103,9 @@ class LogSegment(val messageSet: FileMessageSet,
val startPosition = translateOffset(startOffset)
// if the start position is already off the end of the log, return MessageSet.Empty
// if the start position is already off the end of the log, return null
if(startPosition == null)
return MessageSet.Empty
return null
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length =
@ -103,23 +117,58 @@ class LogSegment(val messageSet: FileMessageSet,
// there is a max offset, translate it to a file position and use that to calculate the max read size
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
val mapping = translateOffset(offset)
val mapping = translateOffset(offset, startPosition.position)
val endPosition =
if(mapping == null)
messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file
log.sizeInBytes() // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(endPosition - startPosition.position, maxSize)
}
}
messageSet.read(startPosition.position, length)
log.read(startPosition.position, length)
}
/**
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log.
*
* @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
* is corrupt.
*/
@nonthreadsafe
def recover(maxMessageSize: Int) {
index.truncate()
var validBytes = 0
var lastIndexEntry = 0
val iter = log.iterator(maxMessageSize)
try {
while(iter.hasNext) {
val entry = iter.next
entry.message.ensureValid()
if(validBytes - lastIndexEntry > indexIntervalBytes) {
index.append(entry.offset, validBytes)
lastIndexEntry = validBytes
}
validBytes += MessageSet.entrySize(entry.message)
}
} catch {
case e: InvalidMessageException =>
logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
}
val truncated = log.sizeInBytes - validBytes
if(truncated > 0)
warn("Truncated " + truncated + " invalid bytes from the log segment %s.".format(log.file.getAbsolutePath))
log.truncateTo(validBytes)
}
override def toString() = "LogSegment(start=" + start + ", size=" + size + ")"
override def toString() = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
/**
* Truncate off all index and log entries with offsets greater than or equal to the current offset.
* Truncate off all index and log entries with offsets >= the given offset.
* If the given offset is larger than the largest message in this segment, do nothing.
* @param offset The offset to truncate to
*/
@nonthreadsafe
def truncateTo(offset: Long) {
val mapping = translateOffset(offset)
if(mapping == null)
@ -127,29 +176,37 @@ class LogSegment(val messageSet: FileMessageSet,
index.truncateTo(offset)
// after truncation, reset and allocate more space for the (new currently active) index
index.resize(index.maxIndexSize)
messageSet.truncateTo(mapping.position)
if (messageSet.sizeInBytes == 0)
firstAppendTime = None
log.truncateTo(mapping.position)
if (log.sizeInBytes == 0)
created = time.milliseconds
}
/**
* Calculate the offset that would be used for the next message to be append to this segment.
* Not that this is expensive.
* Note that this is expensive.
*/
@threadsafe
def nextOffset(): Long = {
val ms = read(index.lastOffset, messageSet.sizeInBytes, None)
ms.lastOption match {
case None => start
case Some(last) => last.nextOffset
val ms = read(index.lastOffset, None, log.sizeInBytes)
if(ms == null) {
baseOffset
} else {
ms.lastOption match {
case None => baseOffset
case Some(last) => last.nextOffset
}
}
}
/**
* Flush this log segment to disk
*/
@threadsafe
def flush() {
messageSet.flush()
index.flush()
LogFlushStats.logFlushTimer.time {
log.flush()
index.flush()
}
}
/**
@ -157,7 +214,25 @@ class LogSegment(val messageSet: FileMessageSet,
*/
def close() {
Utils.swallow(index.close)
Utils.swallow(messageSet.close)
Utils.swallow(log.close)
}
/**
* Delete this log segment from the filesystem.
* @throws KafkaStorageException if the delete fails.
*/
def delete() {
val deletedLog = log.delete()
val deletedIndex = index.delete()
if(!deletedLog && log.file.exists)
throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
if(!deletedIndex && index.file.exists)
throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
}
/**
* The last modified time of this log segment as a unix time stamp
*/
def lastModified = log.file.lastModified
}

View File

@ -51,7 +51,7 @@ import kafka.utils._
*/
class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
/* the memory mapping */
/* initialize the memory mapping for this index */
private var mmap: MappedByteBuffer =
{
val newlyCreated = file.createNewFile()
@ -84,10 +84,12 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
}
}
/* the maximum number of entries this index can hold */
/**
* The maximum number of eight-byte entries this index can hold
*/
def maxEntries = mmap.limit / 8
/* the number of entries in the index */
/* the number of eight-byte entries currently in the index */
private var size = new AtomicInteger(mmap.position / 8)
/* the last offset in the index */
@ -108,6 +110,10 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
/**
* Find the largest offset less than or equal to the given targetOffset
* and return a pair holding this offset and it's corresponding physical file position.
*
* @param targetOffset The offset to look up.
*
* @return The offset found and the corresponding file position for this offset.
* If the target offset is smaller than the least entry in the index (or the index is empty),
* the pair (baseOffset, 0) is returned.
*/
@ -123,7 +129,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
/**
* Find the slot in which the largest offset less than or equal to the given
* target offset is stored.
* Return -1 if the least entry in the index is larger than the target offset or the index is empty
*
* @param idx The index buffer
* @param targetOffset The offset to look for
*
* @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
*/
private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
// we only store the difference from the baseoffset so calculate that
@ -161,6 +171,8 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
/**
* Get the nth offset mapping from the index
* @param n The entry number in the index
* @return The offset/position pair at that entry
*/
def entry(n: Int): OffsetPosition = {
if(n >= entries)
@ -170,7 +182,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
}
/**
* Append entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
* Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
*/
def append(offset: Long, position: Int) {
this synchronized {
@ -192,7 +204,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
def isFull: Boolean = entries >= this.maxEntries
/**
* Truncate the entire index
* Truncate the entire index, deleting all entries
*/
def truncate() = truncateTo(this.baseOffset)

View File

@ -1 +1,6 @@
The log management system for Kafka.
The log management system for Kafka.
The entry point for this system is LogManager. LogManager is responsible for holding all the logs, and handing them out by topic/partition. It also handles the enforcement of the
flush policy and retention policies.
The Log itself is made up of log segments. A log is a FileMessageSet that contains the data and an OffsetIndex that supports reads by offset on the log.

View File

@ -92,15 +92,23 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
}
/**
* Print this message set's contents
* Print this message set's contents. If the message set has more than 100 messages, just
* print the first 100.
*/
override def toString: String = {
val builder = new StringBuilder()
builder.append(getClass.getSimpleName + "(")
for(message <- this) {
val iter = this.iterator
var i = 0
while(iter.hasNext && i < 100) {
val message = iter.next
builder.append(message)
builder.append(", ")
if(iter.hasNext)
builder.append(", ")
i += 1
}
if(iter.hasNext)
builder.append("...")
builder.append(")")
builder.toString
}

View File

@ -75,14 +75,14 @@ class ProducerConfig private (val props: VerifiableProperties)
val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null))
/**
* The producer using the zookeeper software load balancer maintains a ZK cache that gets
* updated by the zookeeper watcher listeners. During some events like a broker bounce, the
* producer ZK cache can get into an inconsistent state, for a small time period. In this time
* period, it could end up picking a broker partition that is unavailable. When this happens, the
* ZK cache needs to be updated.
* This parameter specifies the number of times the producer attempts to refresh this ZK cache.
* If a request fails it is possible to have the producer automatically retry. This is controlled by this setting.
* Note that not all errors mean that the message was lost--for example if the network connection is lost we will
* get a socket exception--in this case enabling retries can result in duplicate messages.
*/
val producerRetries = props.getInt("producer.num.retries", 3)
/**
* The amount of time to wait in between retries
*/
val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
}

View File

@ -21,6 +21,7 @@ import kafka.admin.{CreateTopicCommand, AdminUtils}
import kafka.api._
import kafka.message._
import kafka.network._
import kafka.log._
import kafka.utils.{Pool, SystemTime, Logging}
import org.apache.log4j.Logger
import scala.collection._
@ -59,7 +60,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
@ -243,12 +244,17 @@ class KafkaApis(val requestChannel: RequestChannel,
try {
val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
val log = localReplica.log.get
val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
val info = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
// update stats
BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).messagesInRate.mark(info.count)
BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(info.count)
// we may need to increment high watermark since ISR could be down to 1
localReplica.partition.maybeIncrementLeaderHW(localReplica)
trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
.format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end))
ProduceResult(topicAndPartition, start, end)
.format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset)
} catch {
case e: KafkaStorageException =>
fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
@ -358,11 +364,11 @@ class KafkaApis(val requestChannel: RequestChannel,
else
replicaManager.getLeaderReplicaIfLocal(topic, partition)
trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
val maxOffsetOpt = if (fromReplicaId == Request.OrdinaryConsumerId) {
Some(localReplica.highWatermark)
} else {
None
}
val maxOffsetOpt =
if (fromReplicaId == Request.OrdinaryConsumerId)
Some(localReplica.highWatermark)
else
None
val messages = localReplica.log match {
case Some(log) =>
log.read(offset, maxSize, maxOffsetOpt)
@ -391,15 +397,18 @@ class KafkaApis(val requestChannel: RequestChannel,
else
replicaManager.getReplicaOrException(topicAndPartition.topic, topicAndPartition.partition)
val offsets = {
val allOffsets = replicaManager.logManager.getOffsets(topicAndPartition,
partitionOffsetRequestInfo.time,
partitionOffsetRequestInfo.maxNumOffsets)
if (!offsetRequest.isFromOrdinaryClient) allOffsets
else {
val allOffsets = fetchOffsets(replicaManager.logManager,
topicAndPartition,
partitionOffsetRequestInfo.time,
partitionOffsetRequestInfo.maxNumOffsets)
if (!offsetRequest.isFromOrdinaryClient) {
allOffsets
} else {
val hw = localReplica.highWatermark
if (allOffsets.exists(_ > hw))
hw +: allOffsets.dropWhile(_ > hw)
else allOffsets
else
allOffsets
}
}
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
@ -412,6 +421,59 @@ class KafkaApis(val requestChannel: RequestChannel,
val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
logManager.getLog(topicAndPartition.topic, topicAndPartition.partition) match {
case Some(log) =>
fetchOffsetsBefore(log, timestamp, maxNumOffsets)
case None =>
if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
Seq(0L)
else
Nil
}
}
def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
val segsArray = log.logSegments.toArray
var offsetTimeArray: Array[(Long, Long)] = null
if(segsArray.last.size > 0)
offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
else
offsetTimeArray = new Array[(Long, Long)](segsArray.length)
for(i <- 0 until segsArray.length)
offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
if(segsArray.last.size > 0)
offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
var startIndex = -1
timestamp match {
case OffsetRequest.LatestTime =>
startIndex = offsetTimeArray.length - 1
case OffsetRequest.EarliestTime =>
startIndex = 0
case _ =>
var isFound = false
debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
startIndex = offsetTimeArray.length - 1
while (startIndex >= 0 && !isFound) {
if (offsetTimeArray(startIndex)._2 <= timestamp)
isFound = true
else
startIndex -=1
}
}
val retSize = maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
for(j <- 0 until retSize) {
ret(j) = offsetTimeArray(startIndex)._1
startIndex -= 1
}
// ensure that the returned seq is in descending order of offsets
ret.toSeq.sortBy(- _)
}
/**
* Service the topic metadata request API

View File

@ -38,7 +38,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var logManager: LogManager = null
var kafkaZookeeper: KafkaZooKeeper = null
var replicaManager: ReplicaManager = null
private var apis: KafkaApis = null
var apis: KafkaApis = null
var kafkaController: KafkaController = null
val kafkaScheduler = new KafkaScheduler(4)
var zkClient: ZkClient = null

View File

@ -27,8 +27,6 @@ trait LeaderElector extends Logging {
def amILeader : Boolean
// def electAndBecomeLeader: Unit
//
def elect: Boolean
def close

View File

@ -66,7 +66,7 @@ class ReplicaFetcherThread(name:String,
)
val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
replica.log.get.truncateAndStartWithNewOffset(offset)
replica.log.get.truncateFullyAndStartAt(offset)
offset
}

View File

@ -54,7 +54,7 @@ import java.util.Properties;
* The user need to provide the configuration file for 07 consumer and 08 producer. For 08 producer,
* the "serializer.class" filed is set to "kafka.serializer.DefaultEncode" by the code.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public class KafkaMigrationTool
{
private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());

View File

@ -17,27 +17,24 @@
package kafka.utils;
import java.util.Random
import scala.math._
object Throttler extends Logging {
val DefaultCheckIntervalMs = 100L
}
/**
* A class to measure and throttle the rate of some process. The throttler takes a desired rate-per-second
* (the units of the process don't matter, it could be bytes or a count of some other thing), and will sleep for
* an appropraite amount of time when maybeThrottle() is called to attain the desired rate.
* an appropriate amount of time when maybeThrottle() is called to attain the desired rate.
*
* @param desiredRatePerSec: The rate we want to hit in units/sec
* @param checkIntervalMs: The interval at which to check our rate
* @param throttleDown: Does throttling increase or decrease our rate?
* @param time: The time implementation to use
*/
@nonthreadsafe
@threadsafe
class Throttler(val desiredRatePerSec: Double,
val checkIntervalMs: Long,
val throttleDown: Boolean,
val time: Time) {
val time: Time) extends Logging {
private val lock = new Object
private var periodStartNs: Long = time.nanoseconds
@ -65,8 +62,7 @@ class Throttler(val desiredRatePerSec: Double,
val elapsedMs = elapsedNs / Time.NsPerMs
val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
if(sleepTime > 0) {
Throttler.debug("Natural rate is " + rateInSecs + " per second but desired rate is " + desiredRatePerSec +
", sleeping for " + sleepTime + " ms to compensate.")
println("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime))
time.sleep(sleepTime)
}
}
@ -77,3 +73,26 @@ class Throttler(val desiredRatePerSec: Double,
}
}
object Throttler {
val DefaultCheckIntervalMs = 100L
def main(args: Array[String]) {
val rand = new Random()
val throttler = new Throttler(1000000, 100, true, SystemTime)
var start = System.currentTimeMillis
var total = 0
while(true) {
val value = rand.nextInt(1000)
throttler.maybeThrottle(value)
total += value
val now = System.currentTimeMillis
if(now - start >= 1000) {
println(total)
start = now
total = 0
}
}
}
}

View File

@ -401,8 +401,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
servers.foreach(_.shutdown())
}
private def checkIfReassignPartitionPathExists(): Boolean = {

View File

@ -35,13 +35,21 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
set
}
/**
* Test that the cached size variable matches the actual file size as we append messages
*/
@Test
def testFileSize() {
assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
messageSet.append(singleMessageSet("abcd".getBytes()))
assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
for(i <- 0 until 20) {
messageSet.append(singleMessageSet("abcd".getBytes))
assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
}
}
/**
* Test that adding invalid bytes to the end of the log doesn't break iteration
*/
@Test
def testIterationOverPartialAndTruncation() {
testPartialWrite(0, messageSet)
@ -62,6 +70,9 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
}
/**
* Iterating over the file does file reads but shouldn't change the position of the underlying FileChannel.
*/
@Test
def testIterationDoesntChangePosition() {
val position = messageSet.channel.position
@ -69,39 +80,71 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
assertEquals(position, messageSet.channel.position)
}
/**
* Test a simple append and read.
*/
@Test
def testRead() {
val read = messageSet.read(0, messageSet.sizeInBytes)
var read = messageSet.read(0, messageSet.sizeInBytes)
checkEquals(messageSet.iterator, read.iterator)
val items = read.iterator.toList
val sec = items.tail.head
val read2 = messageSet.read(MessageSet.entrySize(sec.message), messageSet.sizeInBytes)
checkEquals(items.tail.iterator, read2.iterator)
read = messageSet.read(position = MessageSet.entrySize(sec.message), size = messageSet.sizeInBytes)
assertEquals("Try a read starting from the second message", items.tail, read.toList)
read = messageSet.read(MessageSet.entrySize(sec.message), MessageSet.entrySize(sec.message))
assertEquals("Try a read of a single message starting from the second message", List(items.tail.head), read.toList)
}
/**
* Test the MessageSet.searchFor API.
*/
@Test
def testSearch() {
// append a new message with a high offset
val lastMessage = new Message("test".getBytes)
messageSet.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(50), lastMessage))
var physicalOffset = 0
var position = 0
assertEquals("Should be able to find the first message by its offset",
OffsetPosition(0L, physicalOffset),
OffsetPosition(0L, position),
messageSet.searchFor(0, 0))
physicalOffset += MessageSet.entrySize(messageSet.head.message)
position += MessageSet.entrySize(messageSet.head.message)
assertEquals("Should be able to find second message when starting from 0",
OffsetPosition(1L, physicalOffset),
OffsetPosition(1L, position),
messageSet.searchFor(1, 0))
assertEquals("Should be able to find second message starting from its offset",
OffsetPosition(1L, physicalOffset),
messageSet.searchFor(1, physicalOffset))
physicalOffset += MessageSet.entrySize(messageSet.tail.head.message)
assertEquals("Should be able to find third message from a non-existant offset",
OffsetPosition(50L, physicalOffset),
messageSet.searchFor(3, physicalOffset))
assertEquals("Should be able to find third message by correct offset",
OffsetPosition(50L, physicalOffset),
messageSet.searchFor(50, physicalOffset))
OffsetPosition(1L, position),
messageSet.searchFor(1, position))
position += MessageSet.entrySize(messageSet.tail.head.message) + MessageSet.entrySize(messageSet.tail.tail.head.message)
assertEquals("Should be able to find fourth message from a non-existant offset",
OffsetPosition(50L, position),
messageSet.searchFor(3, position))
assertEquals("Should be able to find fourth message by correct offset",
OffsetPosition(50L, position),
messageSet.searchFor(50, position))
}
/**
* Test that the message set iterator obeys start and end slicing
*/
@Test
def testIteratorWithLimits() {
val message = messageSet.toList(1)
val start = messageSet.searchFor(1, 0).position
val size = message.message.size
val slice = messageSet.read(start, size)
assertEquals(List(message), slice.toList)
}
/**
* Test the truncateTo method lops off messages and appropriately updates the size
*/
@Test
def testTruncate() {
val message = messageSet.toList(0)
val end = messageSet.searchFor(1, 0).position
messageSet.truncateTo(end)
assertEquals(List(message), messageSet.toList)
assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes)
}
}

View File

@ -59,6 +59,9 @@ class LogManagerTest extends JUnit3Suite {
super.tearDown()
}
/**
* Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
*/
@Test
def testCreateLog() {
val log = logManager.getOrCreateLog(name, 0)
@ -67,29 +70,34 @@ class LogManagerTest extends JUnit3Suite {
log.append(TestUtils.singleMessageSet("test".getBytes()))
}
/**
* Test that get on a non-existent returns None and no log is created.
*/
@Test
def testGetLog() {
def testGetNonExistentLog() {
val log = logManager.getLog(name, 0)
assertEquals("No log should be found.", None, log)
val logFile = new File(config.logDirs(0), name + "-0")
assertTrue(!logFile.exists)
}
/**
* Test time-based log cleanup. First append messages, then set the time into the future and run cleanup.
*/
@Test
def testCleanupExpiredSegments() {
val log = logManager.getOrCreateLog(name, 0)
var offset = 0L
for(i <- 0 until 1000) {
var set = TestUtils.singleMessageSet("test".getBytes())
val (start, end) = log.append(set)
offset = end
val info = log.append(set)
offset = info.lastOffset
}
log.flush
assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
// update the last modified time of all log segments
val logSegments = log.segments.view
logSegments.foreach(_.messageSet.file.setLastModified(time.currentMs))
log.logSegments.foreach(_.log.file.setLastModified(time.currentMs))
time.currentMs += maxLogAgeHours*60*60*1000 + 1
logManager.cleanupLogs()
@ -106,6 +114,9 @@ class LogManagerTest extends JUnit3Suite {
log.append(TestUtils.singleMessageSet("test".getBytes()))
}
/**
* Test size-based cleanup. Append messages, then run cleanup and check that segments are deleted.
*/
@Test
def testCleanupSegmentsToMaintainSize() {
val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
@ -130,11 +141,9 @@ class LogManagerTest extends JUnit3Suite {
// add a bunch of messages that should be larger than the retentionSize
for(i <- 0 until 1000) {
val set = TestUtils.singleMessageSet("test".getBytes())
val (start, end) = log.append(set)
offset = start
val info = log.append(set)
offset = info.firstOffset
}
// flush to make sure it's written to disk
log.flush
// should be exactly 100 full segments + 1 new empty one
assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
@ -153,29 +162,33 @@ class LogManagerTest extends JUnit3Suite {
log.append(TestUtils.singleMessageSet("test".getBytes()))
}
/**
* Test that flush is invoked by the background scheduler thread.
*/
@Test
def testTimeBasedFlush() {
val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
config = new KafkaConfig(props) {
override val logFileSize = 1024 *1024 *1024
override val flushSchedulerThreadRate = 50
override val flushSchedulerThreadRate = 5
override val defaultFlushIntervalMs = 5
override val flushInterval = Int.MaxValue
override val logRollHours = maxRollInterval
override val flushIntervalMap = Map("timebasedflush" -> 100)
}
logManager = new LogManager(config, scheduler, time)
logManager = new LogManager(config, scheduler, SystemTime)
logManager.startup
val log = logManager.getOrCreateLog(name, 0)
val lastFlush = log.lastFlushTime
for(i <- 0 until 200) {
var set = TestUtils.singleMessageSet("test".getBytes())
log.append(set)
}
val ellapsed = System.currentTimeMillis - log.getLastFlushedTime
assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed),
ellapsed < 2*config.flushSchedulerThreadRate)
Thread.sleep(config.flushSchedulerThreadRate)
assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime)
}
/**
* Test that new logs that are created are assigned to the least loaded log directory
*/
@Test
def testLeastLoadedAssignment() {
// create a log manager with multiple data directories
@ -196,6 +209,9 @@ class LogManagerTest extends JUnit3Suite {
}
}
/**
* Test that it is not possible to open two log managers using the same data directory
*/
def testTwoLogManagersUsingSameDirFails() {
try {
new LogManager(logManager.config, scheduler, time)

View File

@ -2,6 +2,9 @@ package kafka.log
import junit.framework.Assert._
import java.util.concurrent.atomic._
import java.io.File
import java.io.RandomAccessFile
import java.util.Random
import org.junit.{Test, After}
import org.scalatest.junit.JUnit3Suite
import kafka.utils.TestUtils
@ -13,6 +16,7 @@ class LogSegmentTest extends JUnit3Suite {
val segments = mutable.ArrayBuffer[LogSegment]()
/* create a segment with the given base offset */
def createSegment(offset: Long): LogSegment = {
val msFile = TestUtils.tempFile()
val ms = new FileMessageSet(msFile)
@ -24,6 +28,7 @@ class LogSegmentTest extends JUnit3Suite {
seg
}
/* create a ByteBufferMessageSet for the given messages starting from the given offset */
def messages(offset: Long, messages: String*): ByteBufferMessageSet = {
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
offsetCounter = new AtomicLong(offset),
@ -34,17 +39,24 @@ class LogSegmentTest extends JUnit3Suite {
def teardown() {
for(seg <- segments) {
seg.index.delete()
seg.messageSet.delete()
seg.log.delete()
}
}
/**
* A read on an empty log segment should return null
*/
@Test
def testReadOnEmptySegment() {
val seg = createSegment(40)
val read = seg.read(startOffset = 40, maxSize = 300, maxOffset = None)
assertEquals(0, read.size)
assertNull("Read beyond the last offset in the segment should be null", read)
}
/**
* Reading from before the first offset in the segment should return messages
* beginning with the first message in the segment
*/
@Test
def testReadBeforeFirstOffset() {
val seg = createSegment(40)
@ -54,24 +66,40 @@ class LogSegmentTest extends JUnit3Suite {
assertEquals(ms.toList, read.toList)
}
/**
* If we set the startOffset and maxOffset for the read to be the same value
* we should get only the first message in the log
*/
@Test
def testReadSingleMessage() {
val seg = createSegment(40)
val ms = messages(50, "hello", "there")
seg.append(50, ms)
val read = seg.read(startOffset = 41, maxSize = 200, maxOffset = Some(50))
assertEquals(new Message("hello".getBytes), read.head.message)
def testMaxOffset() {
val baseOffset = 50
val seg = createSegment(baseOffset)
val ms = messages(baseOffset, "hello", "there", "beautiful")
seg.append(baseOffset, ms)
def validate(offset: Long) =
assertEquals(ms.filter(_.offset == offset).toList,
seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).toList)
validate(50)
validate(51)
validate(52)
}
/**
* If we read from an offset beyond the last offset in the segment we should get null
*/
@Test
def testReadAfterLast() {
val seg = createSegment(40)
val ms = messages(50, "hello", "there")
seg.append(50, ms)
val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
assertEquals(0, read.size)
assertNull("Read beyond the last offset in the segment should give null", null)
}
/**
* If we read from an offset which doesn't exist we should get a message set beginning
* with the least offset greater than the given startOffset.
*/
@Test
def testReadFromGap() {
val seg = createSegment(40)
@ -83,6 +111,10 @@ class LogSegmentTest extends JUnit3Suite {
assertEquals(ms2.toList, read.toList)
}
/**
* In a loop append two messages then truncate off the second of those messages and check that we can read
* the first but not the second message.
*/
@Test
def testTruncate() {
val seg = createSegment(40)
@ -93,26 +125,33 @@ class LogSegmentTest extends JUnit3Suite {
val ms2 = messages(offset+1, "hello")
seg.append(offset+1, ms2)
// check that we can read back both messages
val read = seg.read(offset, 10000, None)
val read = seg.read(offset, None, 10000)
assertEquals(List(ms1.head, ms2.head), read.toList)
// now truncate off the last message
seg.truncateTo(offset + 1)
val read2 = seg.read(offset, 10000, None)
val read2 = seg.read(offset, None, 10000)
assertEquals(1, read2.size)
assertEquals(ms1.head, read2.head)
offset += 1
}
}
/**
* Test truncating the whole segment, and check that we can reappend with the original offset.
*/
@Test
def testTruncateFull() {
// test the case where we fully truncate the log
val seg = createSegment(40)
seg.append(40, messages(40, "hello", "there"))
seg.truncateTo(0)
assertNull("Segment should be empty.", seg.read(0, None, 1024))
seg.append(40, messages(40, "hello", "there"))
}
/**
* Test that offsets are assigned sequentially and that the nextOffset variable is incremented
*/
@Test
def testNextOffsetCalculation() {
val seg = createSegment(40)
@ -121,4 +160,50 @@ class LogSegmentTest extends JUnit3Suite {
assertEquals(53, seg.nextOffset())
}
/**
* Create a segment with some data and an index. Then corrupt the index,
* and recover the segment, the entries should all be readable.
*/
@Test
def testRecoveryFixesCorruptIndex() {
val seg = createSegment(0)
for(i <- 0 until 100)
seg.append(i, messages(i, i.toString))
val indexFile = seg.index.file
writeNonsense(indexFile, 5, indexFile.length.toInt)
seg.recover(64*1024)
for(i <- 0 until 100)
assertEquals(i, seg.read(i, Some(i+1), 1024).head.offset)
}
/**
* Randomly corrupt a log a number of times and attempt recovery.
*/
@Test
def testRecoveryWithCorruptMessage() {
val rand = new Random(1)
val messagesAppended = 20
for(iteration <- 0 until 10) {
val seg = createSegment(0)
for(i <- 0 until messagesAppended)
seg.append(i, messages(i, i.toString))
val offsetToBeginCorruption = rand.nextInt(messagesAppended)
// start corrupting somewhere in the middle of the chosen record all the way to the end
val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15)
writeNonsense(seg.log.file, position, seg.log.file.length.toInt - position)
seg.recover(64*1024)
assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)
seg.delete()
}
}
def writeNonsense(fileName: File, position: Long, size: Int) {
val file = new RandomAccessFile(fileName, "rw")
file.seek(position)
val rand = new Random
for(i <- 0 until size)
file.writeByte(rand.nextInt(255))
file.close()
}
}

View File

@ -54,7 +54,10 @@ class LogTest extends JUnitSuite {
}
}
/** Test that the size and time based log segment rollout works. */
/**
* Tests for time based log roll. This test appends messages then changes the time
* using the mock clock to force the log to roll and checks the number of segments.
*/
@Test
def testTimeBasedLogRoll() {
val set = TestUtils.singleMessageSet("test".getBytes())
@ -70,27 +73,26 @@ class LogTest extends JUnitSuite {
assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
log.append(set)
assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments)
// segment expires in age
time.currentMs += rollMs + 1
log.append(set)
assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
for(numSegments <- 2 until 4) {
time.currentMs += rollMs + 1
log.append(set)
assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
}
val numSegments = log.numberOfSegments
time.currentMs += rollMs + 1
val blank = Array[Message]()
log.append(new ByteBufferMessageSet(new Message("blah".getBytes)))
assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
time.currentMs += rollMs + 1
// the last segment expired in age, but was blank. So new segment should not be generated
log.append(new ByteBufferMessageSet())
assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments)
}
/**
* Test that appending more than the maximum segment size rolls the log
*/
@Test
def testSizeBasedLogRoll() {
val set = TestUtils.singleMessageSet("test".getBytes())
val set = TestUtils.singleMessageSet("test".getBytes)
val setSize = set.sizeInBytes
val msgPerSeg = 10
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
@ -106,28 +108,80 @@ class LogTest extends JUnitSuite {
assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
}
/**
* Test that we can open and append to an empty log
*/
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
}
@Test
def testAppendAndRead() {
val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 10)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
val messages = log.read(0, 1024)
var current = 0
for(curr <- messages) {
assertEquals("Read message should equal written", message, curr.message)
current += 1
}
assertEquals(10, current)
log.append(TestUtils.singleMessageSet("test".getBytes))
}
/**
* This test case appends a bunch of messages and checks that we can read them all back using sequential offsets.
*/
@Test
def testAppendAndReadWithSequentialOffsets() {
val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
for(i <- 0 until messages.length)
log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i)))
for(i <- 0 until messages.length) {
val read = log.read(i, 100, Some(i+1)).head
assertEquals("Offset read should match order appended.", i, read.offset)
assertEquals("Message should match appended.", messages(i), read.message)
}
assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size)
}
/**
* This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message
* from any offset less than the logEndOffset including offsets not appended.
*/
@Test
def testAppendAndReadWithNonSequentialOffsets() {
val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val messages = messageIds.map(id => new Message(id.toString.getBytes))
// now test the case that we give the offsets and use non-sequential offsets
for(i <- 0 until messages.length)
log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false)
for(i <- 50 until messageIds.max) {
val idx = messageIds.indexWhere(_ >= i)
val read = log.read(i, 100, None).head
assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
assertEquals("Message should match appended.", messages(idx), read.message)
}
}
/**
* This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment.
* Specifically we create a log where the last message in the first segment has offset 0. If we
* then read offset 1, we should expect this read to come from the second segment, even though the
* first segment has the greatest lower bound on the offset.
*/
@Test
def testReadAtLogGap() {
val log = new Log(logDir, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
// keep appending until we have two segments with only a single message in the second segment
while(log.numberOfSegments == 1)
log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
// now manually truncate off all but one message from the first segment to create a gap in the messages
log.logSegments.head.truncateTo(1)
assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset)
}
/**
* Test reading at the boundary of the log, specifically
* - reading from the logEndOffset should give an empty message set
* - reading beyond the log end offset should throw an OffsetOutOfRangeException
*/
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
@ -147,14 +201,17 @@ class LogTest extends JUnitSuite {
}
}
/** Test that writing and reading beyond the log size boundary works */
/**
* Test that covers reads and writes on a multisegment log. This test appends a bunch of messages
* and then reads them all back and checks that the message read and offset matches what was appended.
*/
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
val offsets = messageSets.map(log.append(_)._1)
val offsets = messageSets.map(log.append(_).firstOffset)
log.flush
/* do successive reads to ensure all our messages are there */
@ -169,7 +226,9 @@ class LogTest extends JUnitSuite {
assertEquals("Should be no more messages", 0, lastRead.size)
}
/** Test the case where we have compressed batches of messages */
/**
* Test reads at offsets that fall within compressed message set boundaries.
*/
@Test
def testCompressedMessages() {
/* this log should roll after every messageset */
@ -187,66 +246,46 @@ class LogTest extends JUnitSuite {
assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset)
assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset)
}
@Test
def testFindSegment() {
assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45))
assertEquals("Search in segment list just outside the range of the last segment should find last segment",
9, Log.findRange(makeRanges(5, 9, 12), 12).get.start)
assertEquals("Search in segment list far outside the range of the last segment should find last segment",
9, Log.findRange(makeRanges(5, 9, 12), 100).get.start)
assertEquals("Search in segment list far outside the range of the last segment should find last segment",
None, Log.findRange(makeRanges(5, 9, 12), -1))
assertContains(makeRanges(5, 9, 12), 11)
assertContains(makeRanges(5), 4)
assertContains(makeRanges(5,8), 5)
assertContains(makeRanges(5,8), 6)
}
/**
* Test garbage collecting old segments
*/
@Test
def testEdgeLogRollsStartingAtZero() {
// first test a log segment starting at 0
val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val curOffset = log.logEndOffset
assertEquals(curOffset, 0)
def testThatGarbageCollectingSegmentsDoesntChangeOffset() {
for(messagesToAppend <- List(0, 1, 25)) {
logDir.mkdirs()
// first test a log segment starting at 0
val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
for(i <- 0 until messagesToAppend)
log.append(TestUtils.singleMessageSet(i.toString.getBytes))
var currOffset = log.logEndOffset
assertEquals(currOffset, messagesToAppend)
// time goes by; the log file is deleted
log.markDeletedWhile(_ => true)
// time goes by; the log file is deleted
log.deleteOldSegments(_ => true)
// we now have a new log; the starting offset of the new log should remain 0
assertEquals(curOffset, log.logEndOffset)
log.delete()
}
@Test
def testEdgeLogRollsStartingAtNonZero() {
// second test an empty log segment starting at non-zero
val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val numMessages = 1
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(i.toString.getBytes))
val curOffset = log.logEndOffset
// time goes by; the log file is deleted
log.markDeletedWhile(_ => true)
// we now have a new log
assertEquals(curOffset, log.logEndOffset)
// time goes by; the log file (which is empty) is deleted again
val deletedSegments = log.markDeletedWhile(_ => true)
// we shouldn't delete the last empty log segment.
assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0)
// we now have a new log
assertEquals(curOffset, log.logEndOffset)
assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset)
assertEquals("We should still have one segment left", 1, log.numberOfSegments)
assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true))
assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
currOffset,
log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset)
// cleanup the log
log.delete()
}
}
/**
* We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the
* setting and checking that an exception is thrown.
*/
@Test
def testMessageSizeCheck() {
val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes()))
val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes))
// append messages to log
val maxMessageSize = second.sizeInBytes - 1
@ -259,10 +298,13 @@ class LogTest extends JUnitSuite {
log.append(second)
fail("Second message set should throw MessageSizeTooLargeException.")
} catch {
case e:MessageSizeTooLargeException => // this is good
case e: MessageSizeTooLargeException => // this is good
}
}
/**
* Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly.
*/
@Test
def testLogRecoversToCorrectOffset() {
val numMessages = 100
@ -273,25 +315,53 @@ class LogTest extends JUnitSuite {
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
val lastIndexOffset = log.segments.view.last.index.lastOffset
val numIndexEntries = log.segments.view.last.index.entries
val lastIndexOffset = log.activeSegment.index.lastOffset
val numIndexEntries = log.activeSegment.index.entries
log.close()
// test non-recovery case
log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
log.close()
// test
// test recovery case
log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
log.close()
}
/**
* Test that if we manually delete an index segment it is rebuilt when the log is re-opened
*/
@Test
def testIndexRebuild() {
// publish the messages and close the log
val numMessages = 200
var log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
val indexFiles = log.logSegments.map(_.index.file)
log.close()
// delete all the index files
indexFiles.foreach(_.delete())
// reopen the log
log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages)
assertEquals(i, log.read(i, 100, None).head.offset)
log.close()
}
/**
* Test the Log truncate operations
*/
@Test
def testTruncateTo() {
val set = TestUtils.singleMessageSet("test".getBytes())
@ -329,7 +399,7 @@ class LogTest extends JUnitSuite {
assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
assertEquals("Should be back to original size", log.size, size)
log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1))
log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1))
assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1))
assertEquals("Should change log size", log.size, 0)
@ -343,6 +413,9 @@ class LogTest extends JUnitSuite {
assertEquals("Should change log size", log.size, 0)
}
/**
* Verify that when we truncate a log the index of the last segment is resized to the max index size to allow more appends
*/
@Test
def testIndexResizingAtTruncation() {
val set = TestUtils.singleMessageSet("test".getBytes())
@ -357,48 +430,25 @@ class LogTest extends JUnitSuite {
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
assertEquals("The index of the first segment should be trim to empty", 0, log.segments.view(0).index.maxEntries)
assertEquals("The index of the first segment should be trim to empty", 0, log.logSegments.toList(0).index.maxEntries)
log.truncateTo(0)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.segments.view(0).index.maxEntries)
assertEquals("The index of segment 1 should be resized to maxIndexSize", log.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries)
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
}
@Test
def testAppendWithoutOffsetAssignment() {
for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
logDir.mkdir()
var log = new Log(logDir,
maxLogFileSize = 64*1024,
maxMessageSize = config.maxMessageSize,
maxIndexSize = 1000,
indexIntervalBytes = 10000,
needsRecovery = true)
val messages = List("one", "two", "three", "four", "five", "six")
val ms = new ByteBufferMessageSet(compressionCodec = codec,
offsetCounter = new AtomicLong(5),
messages = messages.map(s => new Message(s.getBytes)):_*)
val firstOffset = ms.shallowIterator.toList.head.offset
val lastOffset = ms.shallowIterator.toList.last.offset
val (first, last) = log.append(ms, assignOffsets = false)
assertEquals(last + 1, log.logEndOffset)
assertEquals(firstOffset, first)
assertEquals(lastOffset, last)
assertTrue(log.read(5, 64*1024).size > 0)
log.delete()
}
}
/**
* Verify that truncation works correctly after re-opening the log
*/
@Test
def testReopenThenTruncate() {
val set = TestUtils.singleMessageSet("test".getBytes())
// create a log
var log = new Log(logDir,
maxLogFileSize = set.sizeInBytes * 5,
maxSegmentSize = set.sizeInBytes * 5,
maxMessageSize = config.maxMessageSize,
maxIndexSize = 1000,
indexIntervalBytes = 10000,
@ -409,7 +459,7 @@ class LogTest extends JUnitSuite {
log.append(set)
log.close()
log = new Log(logDir,
maxLogFileSize = set.sizeInBytes * 5,
maxSegmentSize = set.sizeInBytes * 5,
maxMessageSize = config.maxMessageSize,
maxIndexSize = 1000,
indexIntervalBytes = 10000,
@ -419,24 +469,4 @@ class LogTest extends JUnitSuite {
assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
}
def assertContains(ranges: Array[Range], offset: Long) = {
Log.findRange(ranges, offset) match {
case Some(range) =>
assertTrue(range + " does not contain " + offset, range.contains(offset))
case None => fail("No range found, but expected to find " + offset)
}
}
class SimpleRange(val start: Long, val size: Long) extends Range
def makeRanges(breaks: Int*): Array[Range] = {
val list = new ArrayList[Range]
var prior = 0
for(brk <- breaks) {
list.add(new SimpleRange(prior, brk - prior))
prior = brk
}
list.toArray(new Array[Range](list.size))
}
}

View File

@ -26,7 +26,7 @@ import org.junit.Test
trait BaseMessageSetTestCases extends JUnitSuite {
val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
val messages = Array(new Message("abcd".getBytes), new Message("efgh".getBytes), new Message("ijkl".getBytes))
def createMessageSet(messages: Seq[Message]): MessageSet

View File

@ -397,6 +397,7 @@ class AsyncProducerTest extends JUnit3Suite {
props.put("serializer.class", classOf[StringEncoder].getName.toString)
props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString)
props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
props.put("producer.num.retries", 3.toString)
val config = new ProducerConfig(props)

View File

@ -40,7 +40,7 @@ class SimpleFetchTest extends JUnit3Suite {
val partitionId = 0
/**
* The scenario for this test is that there is one topic, "test-topic", on broker "0" that has
* The scenario for this test is that there is one topic, "test-topic", one broker "0" that has
* one partition with one follower replica on broker "1". The leader replica on "0"
* has HW of "5" and LEO of "20". The follower on broker "1" has a local replica
* with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync
@ -62,6 +62,7 @@ class SimpleFetchTest extends JUnit3Suite {
val log = EasyMock.createMock(classOf[kafka.log.Log])
EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
EasyMock.expect(log)
EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages))
EasyMock.replay(log)
@ -102,33 +103,6 @@ class SimpleFetchTest extends JUnit3Suite {
// make sure the log only reads bytes between 0->HW (5)
EasyMock.verify(log)
// Test offset request from non-replica
val topicAndPartition = TopicAndPartition(topic, partition.partitionId)
val offsetRequest = OffsetRequest(
Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest)
EasyMock.reset(logManager)
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get)
EasyMock.expect(replicaManager.logManager).andReturn(logManager)
EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo))
EasyMock.replay(replicaManager)
EasyMock.replay(logManager)
apis.handleOffsetRequest(new RequestChannel.Request(processor = 0,
requestKey = 5,
buffer = offsetRequestBB,
startTimeMs = 1))
val offsetResponseBuffer = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
EasyMock.verify(replicaManager)
EasyMock.verify(logManager)
assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size)
assertEquals(hw.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head)
}
/**
@ -203,34 +177,6 @@ class SimpleFetchTest extends JUnit3Suite {
* an offset of 15
*/
EasyMock.verify(log)
// Test offset request from replica
val topicAndPartition = TopicAndPartition(topic, partition.partitionId)
val offsetRequest = OffsetRequest(
Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)),
replicaId = followerReplicaId)
val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest)
EasyMock.reset(logManager)
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get)
EasyMock.expect(replicaManager.logManager).andReturn(logManager)
EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo))
EasyMock.replay(replicaManager)
EasyMock.replay(logManager)
apis.handleOffsetRequest(new RequestChannel.Request(processor = 1,
requestKey = 5,
buffer = offsetRequestBB,
startTimeMs = 1))
val offsetResponseBuffer = requestChannel.receiveResponse(1).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
EasyMock.verify(replicaManager)
EasyMock.verify(logManager)
assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size)
assertEquals(leo.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head)
}
private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,

View File

@ -127,7 +127,6 @@ object TestUtils extends Logging {
props.put("hostname", "localhost")
props.put("port", port.toString)
props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
props.put("log.flush.interval", "1")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("replica.socket.timeout.ms", "1500")
props

View File

@ -33,7 +33,8 @@ class EmbeddedZookeeper(val connectString: String) {
factory.startup(zookeeper)
def shutdown() {
factory.shutdown()
Utils.swallow(zookeeper.shutdown())
Utils.swallow(factory.shutdown())
Utils.rm(logDir)
Utils.rm(snapshotDir)
}

View File

@ -19,7 +19,7 @@ package kafka.zk
import org.scalatest.junit.JUnit3Suite
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZKStringSerializer, TestZKUtils}
import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils}
trait ZooKeeperTestHarness extends JUnit3Suite {
val zkConnect: String = TestZKUtils.zookeeperConnect
@ -36,8 +36,8 @@ trait ZooKeeperTestHarness extends JUnit3Suite {
override def tearDown() {
super.tearDown
zkClient.close()
zookeeper.shutdown()
Utils.swallow(zkClient.close())
Utils.swallow(zookeeper.shutdown())
}
}

View File

@ -25,7 +25,6 @@ import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
public class Consumer extends Thread