mirror of https://github.com/apache/kafka.git
KAFKA-348 rebase branch from trunk patch by Jun Rao reviewed by Joe Stein
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1351112 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ebcf7fc10d
commit
efdc57bc58
|
@ -21,10 +21,11 @@ import kafka.api.OffsetRequest
|
|||
import java.io.{IOException, RandomAccessFile, File}
|
||||
import java.util.{Comparator, Collections, ArrayList}
|
||||
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger}
|
||||
import kafka.message.{MessageSet, InvalidMessageException, FileMessageSet}
|
||||
import kafka.utils._
|
||||
import java.text.NumberFormat
|
||||
import kafka.common.OffsetOutOfRangeException
|
||||
import kafka.server.BrokerTopicStat
|
||||
import kafka.common.{InvalidMessageSizeException, OffsetOutOfRangeException}
|
||||
import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet}
|
||||
|
||||
object Log {
|
||||
val FileSuffix = ".kafka"
|
||||
|
@ -214,7 +215,7 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In
|
|||
* Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
|
||||
* Returns the offset at which the messages are written.
|
||||
*/
|
||||
def append(messages: MessageSet): Unit = {
|
||||
def append(messages: ByteBufferMessageSet): Unit = {
|
||||
// validate the messages
|
||||
var numberOfMessages = 0
|
||||
for(messageAndOffset <- messages) {
|
||||
|
@ -223,13 +224,25 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In
|
|||
numberOfMessages += 1;
|
||||
}
|
||||
|
||||
BrokerTopicStat.getBrokerTopicStat(topicName).recordMessagesIn(numberOfMessages)
|
||||
BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
|
||||
logStats.recordAppendedMessages(numberOfMessages)
|
||||
|
||||
// truncate the message set's buffer upto validbytes, before appending it to the on-disk log
|
||||
val validByteBuffer = messages.getBuffer.duplicate()
|
||||
val messageSetValidBytes = messages.validBytes
|
||||
if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
|
||||
throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes +
|
||||
" Message set cannot be appended to log. Possible causes are corrupted produce requests")
|
||||
|
||||
validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
|
||||
val validMessages = new ByteBufferMessageSet(validByteBuffer)
|
||||
|
||||
// they are valid, insert them in the log
|
||||
lock synchronized {
|
||||
try {
|
||||
val segment = segments.view.last
|
||||
segment.messageSet.append(messages)
|
||||
segment.messageSet.append(validMessages)
|
||||
maybeFlush(numberOfMessages)
|
||||
maybeRoll(segment)
|
||||
}
|
||||
|
@ -262,10 +275,17 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In
|
|||
val deletable = view.takeWhile(predicate)
|
||||
for(seg <- deletable)
|
||||
seg.deleted = true
|
||||
val numToDelete = deletable.size
|
||||
var numToDelete = deletable.size
|
||||
// if we are deleting everything, create a new empty segment
|
||||
if(numToDelete == view.size) {
|
||||
if (view(numToDelete - 1).size > 0)
|
||||
roll()
|
||||
else {
|
||||
// If the last segment to be deleted is empty and we roll the log, the new segment will have the same
|
||||
// file name. So simply reuse the last segment and reset the modified time.
|
||||
view(numToDelete - 1).file.setLastModified(SystemTime.milliseconds)
|
||||
numToDelete -=1
|
||||
}
|
||||
}
|
||||
segments.trunc(numToDelete)
|
||||
}
|
||||
|
@ -309,9 +329,12 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In
|
|||
*/
|
||||
def roll() {
|
||||
lock synchronized {
|
||||
val last = segments.view.last
|
||||
val newOffset = nextAppendOffset
|
||||
val newFile = new File(dir, nameFromOffset(newOffset))
|
||||
if (newFile.exists) {
|
||||
warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first")
|
||||
newFile.delete()
|
||||
}
|
||||
debug("Rolling log '" + name + "' to " + newFile.getName())
|
||||
segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
|
||||
}
|
||||
|
|
|
@ -53,6 +53,7 @@ class BlockingChannel( val host: String,
|
|||
channel.configureBlocking(true)
|
||||
channel.socket.setSoTimeout(readTimeoutMs)
|
||||
channel.socket.setKeepAlive(true)
|
||||
channel.socket.setTcpNoDelay(true)
|
||||
channel.connect(new InetSocketAddress(host, port))
|
||||
|
||||
writeChannel = channel
|
||||
|
|
|
@ -126,6 +126,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
|
|||
for(topicData <- request.data) {
|
||||
for(partitionData <- topicData.partitionData) {
|
||||
msgIndex += 1
|
||||
BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes)
|
||||
BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes)
|
||||
try {
|
||||
kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
|
||||
val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
|
||||
|
@ -136,6 +138,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
|
|||
trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
|
||||
} catch {
|
||||
case e =>
|
||||
BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest
|
||||
BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
|
||||
error("Error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e)
|
||||
e match {
|
||||
case _: IOException =>
|
||||
|
@ -239,12 +243,16 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
|
|||
for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
|
||||
val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
|
||||
case Left(err) =>
|
||||
BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
|
||||
BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
|
||||
fetchRequest.replicaId match {
|
||||
case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
|
||||
case _ =>
|
||||
new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
|
||||
}
|
||||
case Right(messages) =>
|
||||
BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
|
||||
BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
|
||||
val leaderReplicaOpt = replicaManager.getReplica(topic, partition, logManager.config.brokerId)
|
||||
assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) +
|
||||
" must exist on leader broker %d".format(logManager.config.brokerId))
|
||||
|
|
|
@ -185,7 +185,7 @@ class LogTest extends JUnitSuite {
|
|||
val deletedSegments = log.markDeletedWhile(_ => true)
|
||||
|
||||
// we shouldn't delete the last empty log segment.
|
||||
assertTrue("We shouldn't delete the last empty log segment", log.segments.view.size == 1)
|
||||
assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0)
|
||||
|
||||
// we now have a new log
|
||||
assertEquals(curOffset, log.nextAppendOffset)
|
||||
|
|
Loading…
Reference in New Issue