KAFKA-4384; ReplicaFetcherThread stops after ReplicaFetcherThread receives a corrupted message

Author: Jun He <jun.he@airbnb.com>

Reviewers: Jiangjie (Becket) Qin <becket.qin@gmail.com>, Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2127 from jun-he/KAFKA-4384
This commit is contained in:
Jun He 2016-11-24 13:10:19 +00:00 committed by Ismael Juma
parent e035fc0395
commit 3e3b7a010b
2 changed files with 101 additions and 16 deletions

View File

@ -145,18 +145,19 @@ abstract class AbstractFetcherThread(name: String,
case Errors.NONE =>
try {
val messages = partitionData.toByteBufferMessageSet
val newOffset = messages.shallowIterator.toSeq.lastOption match {
case Some(m) =>
partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(m.nextOffset))
fetcherStats.byteRate.mark(messages.validBytes)
m.nextOffset
case None =>
currentPartitionFetchState.offset
}
val newOffset = messages.shallowIterator.toSeq.lastOption.map(_.nextOffset).getOrElse(
currentPartitionFetchState.offset)
fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData)
val validBytes = messages.validBytes
if (validBytes > 0) {
// Update partitionStates only if there is no exception during processPartitionData
partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
fetcherStats.byteRate.mark(validBytes)
}
} catch {
case ime: CorruptRecordException =>
// we log the error and continue. This ensures two things
@ -164,6 +165,7 @@ abstract class AbstractFetcherThread(name: String,
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
// should get fixed in the subsequent fetches
logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage)
updatePartitionsWithError(topicPartition);
case e: Throwable =>
throw new KafkaException("error processing data for partition [%s,%d] offset %d"
.format(topic, partitionId, currentPartitionFetchState.offset), e)

View File

@ -19,16 +19,17 @@ package kafka.server
import com.yammer.metrics.Metrics
import kafka.cluster.BrokerEndPoint
import kafka.message.ByteBufferMessageSet
import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec}
import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.Utils
import org.junit.Assert.{assertFalse, assertTrue}
import org.junit.{Before, Test}
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.{mutable, Map}
class AbstractFetcherThreadTest {
@ -90,10 +91,10 @@ class AbstractFetcherThreadTest {
override def offset(topicAndPartition: TopicPartition): Long = offsets(topicAndPartition)
}
class DummyPartitionData extends PartitionData {
class TestPartitionData(byteBufferMessageSet: ByteBufferMessageSet) extends PartitionData {
override def errorCode: Short = Errors.NONE.code
override def toByteBufferMessageSet: ByteBufferMessageSet = new ByteBufferMessageSet()
override def toByteBufferMessageSet: ByteBufferMessageSet = byteBufferMessageSet
override def highWatermark: Long = 0L
@ -102,8 +103,9 @@ class AbstractFetcherThreadTest {
class DummyFetcherThread(name: String,
clientId: String,
sourceBroker: BrokerEndPoint)
extends AbstractFetcherThread(name, clientId, sourceBroker, 0) {
sourceBroker: BrokerEndPoint,
fetchBackOffMs: Int = 0)
extends AbstractFetcherThread(name, clientId, sourceBroker, fetchBackOffMs) {
type REQ = DummyFetchRequest
type PD = PartitionData
@ -116,11 +118,92 @@ class AbstractFetcherThreadTest {
override def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]): Unit = {}
override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, DummyPartitionData)] =
fetchRequest.offsets.mapValues(_ => new DummyPartitionData).toSeq
override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, TestPartitionData)] =
fetchRequest.offsets.mapValues(_ => new TestPartitionData(new ByteBufferMessageSet())).toSeq
override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest =
new DummyFetchRequest(partitionMap.map { case (k, v) => (k, v.offset) }.toMap)
}
@Test
def testFetchRequestCorruptedMessageException() {
val partition = new TopicPartition("topic", 0)
val fetcherThread = new CorruptingFetcherThread("test", "client", new BrokerEndPoint(0, "localhost", 9092),
fetchBackOffMs = 1)
fetcherThread.start()
// Add one partition for fetching
fetcherThread.addPartitions(Map(partition -> 0L))
// Wait until fetcherThread finishes the work
TestUtils.waitUntilTrue(() => fetcherThread.fetchCount > 3, "Failed waiting for fetcherThread tp finish the work")
fetcherThread.shutdown()
// The fetcherThread should have fetched two normal messages
assertTrue(fetcherThread.logEndOffset == 2)
}
class CorruptingFetcherThread(name: String,
clientId: String,
sourceBroker: BrokerEndPoint,
fetchBackOffMs: Int = 0)
extends DummyFetcherThread(name, clientId, sourceBroker, fetchBackOffMs) {
@volatile var logEndOffset = 0L
@volatile var fetchCount = 0
private val normalPartitionDataSet = List(
new TestPartitionData(new ByteBufferMessageSet(NoCompressionCodec, Seq(0L), new Message("hello".getBytes))),
new TestPartitionData(new ByteBufferMessageSet(NoCompressionCodec, Seq(1L), new Message("hello".getBytes)))
)
override def processPartitionData(topicAndPartition: TopicPartition,
fetchOffset: Long,
partitionData: PartitionData): Unit = {
// Throw exception if the fetchOffset does not match the fetcherThread partition state
if (fetchOffset != logEndOffset)
throw new RuntimeException(
"Offset mismatch for partition %s: fetched offset = %d, log end offset = %d."
.format(topicAndPartition, fetchOffset, logEndOffset))
// Now check message's crc
val messages = partitionData.toByteBufferMessageSet
for (messageAndOffset <- messages.shallowIterator) {
messageAndOffset.message.ensureValid()
logEndOffset = messageAndOffset.nextOffset
}
}
override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, TestPartitionData)] = {
fetchCount += 1
// Set the first fetch to get a corrupted message
if (fetchCount == 1) {
val corruptedMessage = new Message("hello".getBytes)
val badChecksum = (corruptedMessage.checksum + 1 % Int.MaxValue).toInt
// Garble checksum
Utils.writeUnsignedInt(corruptedMessage.buffer, Message.CrcOffset, badChecksum)
val byteBufferMessageSet = new ByteBufferMessageSet(NoCompressionCodec, corruptedMessage)
fetchRequest.offsets.mapValues(_ => new TestPartitionData(byteBufferMessageSet)).toSeq
} else
// Then, the following fetches get the normal data
fetchRequest.offsets.mapValues(v => normalPartitionDataSet(v.toInt)).toSeq
}
override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): DummyFetchRequest = {
val requestMap = new mutable.HashMap[TopicPartition, Long]
partitionMap.foreach { case (topicPartition, partitionFetchState) =>
// Add backoff delay check
if (partitionFetchState.isActive)
requestMap.put(topicPartition, partitionFetchState.offset)
}
new DummyFetchRequest(requestMap)
}
override def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) = delayPartitions(partitions, fetchBackOffMs.toLong)
}
}