KAFKA-3580; Improve error logging in ReplicaFetchThread

Author: Manikumar reddy O <manikumar.reddy@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1237 from omkreddy/KAFKA-3580
This commit is contained in:
Manikumar reddy O 2016-04-27 06:53:16 -07:00 committed by Ismael Juma
parent 77fa0b116f
commit 2885bc33da
1 changed files with 6 additions and 6 deletions

View File

@ -115,10 +115,10 @@ class ReplicaFetcherThread(name: String,
val TopicAndPartition(topic, partitionId) = topicAndPartition
val replica = replicaMgr.getReplica(topic, partitionId).get
val messageSet = partitionData.toByteBufferMessageSet
warnIfMessageOversized(messageSet)
warnIfMessageOversized(messageSet, topicAndPartition)
if (fetchOffset != replica.logEndOffset.messageOffset)
throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicAndPartition, fetchOffset, replica.logEndOffset.messageOffset))
if (logger.isTraceEnabled)
trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
.format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.highWatermark))
@ -136,15 +136,15 @@ class ReplicaFetcherThread(name: String,
.format(replica.brokerId, topic, partitionId, followerHighWatermark))
} catch {
case e: KafkaStorageException =>
fatal("Disk error while replicating data.", e)
fatal(s"Disk error while replicating data for $topicAndPartition", e)
Runtime.getRuntime.halt(1)
}
}
def warnIfMessageOversized(messageSet: ByteBufferMessageSet): Unit = {
def warnIfMessageOversized(messageSet: ByteBufferMessageSet, topicAndPartition: TopicAndPartition): Unit = {
if (messageSet.sizeInBytes > 0 && messageSet.validBytes <= 0)
error("Replication is failing due to a message that is greater than replica.fetch.max.bytes. This " +
"generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " +
error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicAndPartition. " +
"This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " +
"message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " +
"equal or larger than your settings for max.message.bytes, both at a broker and topic level.")
}