kafka-1670; Corrupt log files for segment.bytes values close to Int.MaxInt; patched by Sriharsha Chintalapani; reviewed by Jay Kreps and Jun Rao

This commit is contained in:
Sriharsha Chintalapani 2014-10-09 08:05:32 -07:00 committed by Jun Rao
parent 68b9f7716d
commit c940470e32
8 changed files with 117 additions and 23 deletions

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* This record batch is larger than the maximum allowable size
*/
public class RecordBatchTooLargeException extends ApiException {
private static final long serialVersionUID = 1L;
public RecordBatchTooLargeException() {
super();
}
public RecordBatchTooLargeException(String message, Throwable cause) {
super(message, cause);
}
public RecordBatchTooLargeException(String message) {
super(message);
}
public RecordBatchTooLargeException(Throwable cause) {
super(cause);
}
}

View File

@ -43,7 +43,8 @@ public enum Errors {
OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")),
// TODO: errorCode 14, 15, 16
INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic."));
INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server."));
private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();

View File

@ -19,7 +19,6 @@ package kafka.common
import kafka.message.InvalidMessageException
import java.nio.ByteBuffer
import java.lang.Throwable
import scala.Predef._
/**
@ -47,6 +46,7 @@ object ErrorMapping {
val ConsumerCoordinatorNotAvailableCode: Short = 15
val NotCoordinatorForConsumerCode: Short = 16
val InvalidTopicCode : Short = 17
val MessageSetSizeTooLargeCode: Short = 18
private val exceptionToCode =
Map[Class[Throwable], Short](
@ -65,7 +65,8 @@ object ErrorMapping {
classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode,
classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode,
classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode,
classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode
classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode
).withDefaultValue(UnknownCode)
/* invert the mapping */

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
class MessageSetSizeTooLargeException(message: String) extends RuntimeException(message) {
def this() = this(null)
}

View File

@ -252,9 +252,6 @@ class Log(val dir: File,
lock synchronized {
appendInfo.firstOffset = nextOffsetMetadata.messageOffset
// maybe roll the log if this segment is full
val segment = maybeRoll()
if(assignOffsets) {
// assign offsets to the message set
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
@ -282,6 +279,16 @@ class Log(val dir: File,
}
}
// check messages set size may be exceed config.segmentSize
if(validMessages.sizeInBytes > config.segmentSize) {
throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
.format(validMessages.sizeInBytes, config.segmentSize))
}
// maybe roll the log if this segment is full
val segment = maybeRoll(validMessages.sizeInBytes)
// now append to the log
segment.append(appendInfo.firstOffset, validMessages)
@ -489,12 +496,20 @@ class Log(val dir: File,
def logEndOffset: Long = nextOffsetMetadata.messageOffset
/**
* Roll the log over to a new empty log segment if necessary
* Roll the log over to a new empty log segment if necessary.
*
* @param messagesSize The messages set size in bytes
* logSegment will be rolled if one of the following conditions met
* <ol>
* <li> The logSegment is full
* <li> The maxTime has elapsed
* <li> The index is full
* </ol>
* @return The currently active segment after (perhaps) rolling to a new segment
*/
private def maybeRoll(): LogSegment = {
private def maybeRoll(messagesSize: Int): LogSegment = {
val segment = activeSegment
if (segment.size > config.segmentSize ||
if (segment.size > config.segmentSize - messagesSize ||
segment.size > 0 && time.milliseconds - segment.created > config.segmentMs ||
segment.index.isFull) {
debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."

View File

@ -114,7 +114,7 @@ class LogManagerTest extends JUnit3Suite {
val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
logManager.shutdown()
val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L)
val config = logConfig.copy(segmentSize = 10 * setSize, retentionSize = 5L * 10L * setSize + 10L)
logManager = createLogManager()
logManager.startup

View File

@ -18,15 +18,13 @@
package kafka.log
import java.io._
import java.util.ArrayList
import java.util.concurrent.atomic._
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.message._
import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException}
import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException, MessageSetSizeTooLargeException}
import kafka.utils._
import scala.Some
import kafka.server.KafkaConfig
class LogTest extends JUnitSuite {
@ -239,7 +237,7 @@ class LogTest extends JUnitSuite {
@Test
def testCompressedMessages() {
/* this log should roll after every messageset */
val log = new Log(logDir, logConfig.copy(segmentSize = 10), recoveryPoint = 0L, time.scheduler, time = time)
val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
@ -286,7 +284,26 @@ class LogTest extends JUnitSuite {
}
/**
* We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the
* MessageSet size shouldn't exceed the config.segmentSize, check that it is properly enforced by
* appending a message set larger than the config.segmentSize setting and checking that an exception is thrown.
*/
@Test
def testMessageSetSizeCheck() {
val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
// append messages to log
val configSegmentSize = messageSet.sizeInBytes - 1
val log = new Log(logDir, logConfig.copy(segmentSize = configSegmentSize), recoveryPoint = 0L, time.scheduler, time = time)
try {
log.append(messageSet)
fail("message set should throw MessageSetSizeTooLargeException.")
} catch {
case e: MessageSetSizeTooLargeException => // this is good
}
}
/**
* We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the
* setting and checking that an exception is thrown.
*/
@Test
@ -305,10 +322,9 @@ class LogTest extends JUnitSuite {
log.append(second)
fail("Second message set should throw MessageSizeTooLargeException.")
} catch {
case e: MessageSizeTooLargeException => // this is good
case e: MessageSizeTooLargeException => // this is good
}
}
/**
* Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly.
*/
@ -375,7 +391,7 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
val setSize = set.sizeInBytes
val msgPerSeg = 10
val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
// create a log
val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
@ -429,7 +445,7 @@ class LogTest extends JUnitSuite {
val set = TestUtils.singleMessageSet("test".getBytes())
val setSize = set.sizeInBytes
val msgPerSeg = 10
val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
val config = logConfig.copy(segmentSize = segmentSize)
val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)

View File

@ -92,7 +92,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
log.flush()
val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10)
assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), offsets)
waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
val topicAndPartition = TopicAndPartition(topic, part)
@ -101,7 +101,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
replicaId = 0)
val consumerOffsets =
simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), consumerOffsets)
// try to fetch using latest offset
val fetchResponse = simpleConsumer.fetch(
@ -155,14 +155,14 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10)
assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), offsets)
waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected")
val topicAndPartition = TopicAndPartition(topic, part)
val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
val consumerOffsets =
simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), consumerOffsets)
}
@Test