mirror of https://github.com/apache/kafka.git
MINOR: reduce impact of trace logging in replica hot path (#8468)
The impact of trace logging is normally small, on the order of 40ns per getEffectiveLevel check, however this adds up with trace is called multiple times per partition in the replica fetch hot path. This PR removes some trace logs that are not very useful and reduces cases where the level is checked over and over for one fetch request. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
This commit is contained in:
parent
0f8dc1fcd7
commit
851b45c842
|
@ -83,7 +83,6 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
|
||||||
lastFetchLeaderLogEndOffset = leaderEndOffset
|
lastFetchLeaderLogEndOffset = leaderEndOffset
|
||||||
lastFetchTimeMs = followerFetchTimeMs
|
lastFetchTimeMs = followerFetchTimeMs
|
||||||
updateLastSentHighWatermark(lastSentHighwatermark)
|
updateLastSentHighWatermark(lastSentHighwatermark)
|
||||||
trace(s"Updated state of replica to $this")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -96,7 +95,6 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
|
||||||
*/
|
*/
|
||||||
private def updateLastSentHighWatermark(highWatermark: Long): Unit = {
|
private def updateLastSentHighWatermark(highWatermark: Long): Unit = {
|
||||||
_lastSentHighWatermark = highWatermark
|
_lastSentHighWatermark = highWatermark
|
||||||
trace(s"Updated HW of replica to $highWatermark")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = {
|
def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = {
|
||||||
|
|
|
@ -149,6 +149,7 @@ class ReplicaFetcherThread(name: String,
|
||||||
override def processPartitionData(topicPartition: TopicPartition,
|
override def processPartitionData(topicPartition: TopicPartition,
|
||||||
fetchOffset: Long,
|
fetchOffset: Long,
|
||||||
partitionData: FetchData): Option[LogAppendInfo] = {
|
partitionData: FetchData): Option[LogAppendInfo] = {
|
||||||
|
val logTrace = isTraceEnabled
|
||||||
val partition = replicaMgr.nonOfflinePartition(topicPartition).get
|
val partition = replicaMgr.nonOfflinePartition(topicPartition).get
|
||||||
val log = partition.localLogOrException
|
val log = partition.localLogOrException
|
||||||
val records = toMemoryRecords(partitionData.records)
|
val records = toMemoryRecords(partitionData.records)
|
||||||
|
@ -159,14 +160,14 @@ class ReplicaFetcherThread(name: String,
|
||||||
throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
|
throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
|
||||||
topicPartition, fetchOffset, log.logEndOffset))
|
topicPartition, fetchOffset, log.logEndOffset))
|
||||||
|
|
||||||
if (isTraceEnabled)
|
if (logTrace)
|
||||||
trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
|
trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
|
||||||
.format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
|
.format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
|
||||||
|
|
||||||
// Append the leader's messages to the log
|
// Append the leader's messages to the log
|
||||||
val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
|
val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
|
||||||
|
|
||||||
if (isTraceEnabled)
|
if (logTrace)
|
||||||
trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
|
trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
|
||||||
.format(log.logEndOffset, records.sizeInBytes, topicPartition))
|
.format(log.logEndOffset, records.sizeInBytes, topicPartition))
|
||||||
val leaderLogStartOffset = partitionData.logStartOffset
|
val leaderLogStartOffset = partitionData.logStartOffset
|
||||||
|
@ -175,7 +176,7 @@ class ReplicaFetcherThread(name: String,
|
||||||
// These values will be computed upon becoming leader or handling a preferred read replica fetch.
|
// These values will be computed upon becoming leader or handling a preferred read replica fetch.
|
||||||
val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark)
|
val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark)
|
||||||
log.maybeIncrementLogStartOffset(leaderLogStartOffset)
|
log.maybeIncrementLogStartOffset(leaderLogStartOffset)
|
||||||
if (isTraceEnabled)
|
if (logTrace)
|
||||||
trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
|
trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark")
|
||||||
|
|
||||||
// Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
|
// Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
|
||||||
|
|
|
@ -842,7 +842,7 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
origin: AppendOrigin,
|
origin: AppendOrigin,
|
||||||
entriesPerPartition: Map[TopicPartition, MemoryRecords],
|
entriesPerPartition: Map[TopicPartition, MemoryRecords],
|
||||||
requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
|
requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
|
||||||
|
val traceEnabled = isTraceEnabled
|
||||||
def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
|
def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
|
||||||
val logStartOffset = getPartition(topicPartition) match {
|
val logStartOffset = getPartition(topicPartition) match {
|
||||||
case HostedPartition.Online(partition) => partition.logStartOffset
|
case HostedPartition.Online(partition) => partition.logStartOffset
|
||||||
|
@ -855,7 +855,9 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
logStartOffset
|
logStartOffset
|
||||||
}
|
}
|
||||||
|
|
||||||
trace(s"Append [$entriesPerPartition] to local log")
|
if (traceEnabled)
|
||||||
|
trace(s"Append [$entriesPerPartition] to local log")
|
||||||
|
|
||||||
entriesPerPartition.map { case (topicPartition, records) =>
|
entriesPerPartition.map { case (topicPartition, records) =>
|
||||||
brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
|
brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
|
||||||
brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
|
brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
|
||||||
|
@ -877,8 +879,10 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
|
brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
|
||||||
brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
|
brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
|
||||||
|
|
||||||
trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
|
if (traceEnabled)
|
||||||
s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}")
|
trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
|
||||||
|
s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}")
|
||||||
|
|
||||||
(topicPartition, LogAppendResult(info))
|
(topicPartition, LogAppendResult(info))
|
||||||
} catch {
|
} catch {
|
||||||
// NOTE: Failed produce requests metric is not incremented for known exceptions
|
// NOTE: Failed produce requests metric is not incremented for known exceptions
|
||||||
|
@ -1024,6 +1028,7 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
|
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
|
||||||
quota: ReplicaQuota,
|
quota: ReplicaQuota,
|
||||||
clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = {
|
clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = {
|
||||||
|
val traceEnabled = isTraceEnabled
|
||||||
|
|
||||||
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
|
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
|
||||||
val offset = fetchInfo.fetchOffset
|
val offset = fetchInfo.fetchOffset
|
||||||
|
@ -1035,9 +1040,10 @@ class ReplicaManager(val config: KafkaConfig,
|
||||||
|
|
||||||
val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
|
val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
|
||||||
try {
|
try {
|
||||||
trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
|
if (traceEnabled)
|
||||||
s"remaining response limit $limitBytes" +
|
trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
|
||||||
(if (minOneMessage) s", ignoring response/partition size limits" else ""))
|
s"remaining response limit $limitBytes" +
|
||||||
|
(if (minOneMessage) s", ignoring response/partition size limits" else ""))
|
||||||
|
|
||||||
val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)
|
val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)
|
||||||
val fetchTimeMs = time.milliseconds
|
val fetchTimeMs = time.milliseconds
|
||||||
|
|
Loading…
Reference in New Issue