diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index f9de7ba784c..d1fc345bbd6 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -83,7 +83,6 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log lastFetchLeaderLogEndOffset = leaderEndOffset lastFetchTimeMs = followerFetchTimeMs updateLastSentHighWatermark(lastSentHighwatermark) - trace(s"Updated state of replica to $this") } /** @@ -96,7 +95,6 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log */ private def updateLastSentHighWatermark(highWatermark: Long): Unit = { _lastSentHighWatermark = highWatermark - trace(s"Updated HW of replica to $highWatermark") } def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index a42f6891a48..ec6d75c9eb4 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -149,6 +149,7 @@ class ReplicaFetcherThread(name: String, override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = { + val logTrace = isTraceEnabled val partition = replicaMgr.nonOfflinePartition(topicPartition).get val log = partition.localLogOrException val records = toMemoryRecords(partitionData.records) @@ -159,14 +160,14 @@ class ReplicaFetcherThread(name: String, throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format( topicPartition, fetchOffset, log.logEndOffset)) - if (isTraceEnabled) + if (logTrace) trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d" .format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) // Append the leader's messages to the log val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false) - if (isTraceEnabled) + if (logTrace) trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s" .format(log.logEndOffset, records.sizeInBytes, topicPartition)) val leaderLogStartOffset = partitionData.logStartOffset @@ -175,7 +176,7 @@ class ReplicaFetcherThread(name: String, // These values will be computed upon becoming leader or handling a preferred read replica fetch. val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark) log.maybeIncrementLogStartOffset(leaderLogStartOffset) - if (isTraceEnabled) + if (logTrace) trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark") // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 96591722772..4d917d58a07 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -842,7 +842,7 @@ class ReplicaManager(val config: KafkaConfig, origin: AppendOrigin, entriesPerPartition: Map[TopicPartition, MemoryRecords], requiredAcks: Short): Map[TopicPartition, LogAppendResult] = { - + val traceEnabled = isTraceEnabled def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = { val logStartOffset = getPartition(topicPartition) match { case HostedPartition.Online(partition) => partition.logStartOffset @@ -855,7 +855,9 @@ class ReplicaManager(val config: KafkaConfig, logStartOffset } - trace(s"Append [$entriesPerPartition] to local log") + if (traceEnabled) + trace(s"Append [$entriesPerPartition] to local log") + entriesPerPartition.map { case (topicPartition, records) => brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark() brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark() @@ -877,8 +879,10 @@ class ReplicaManager(val config: KafkaConfig, brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages) brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages) - trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " + - s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}") + if (traceEnabled) + trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " + + s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}") + (topicPartition, LogAppendResult(info)) } catch { // NOTE: Failed produce requests metric is not incremented for known exceptions @@ -1024,6 +1028,7 @@ class ReplicaManager(val config: KafkaConfig, readPartitionInfo: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota, clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = { + val traceEnabled = isTraceEnabled def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = { val offset = fetchInfo.fetchOffset @@ -1035,9 +1040,10 @@ class ReplicaManager(val config: KafkaConfig, val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes) try { - trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " + - s"remaining response limit $limitBytes" + - (if (minOneMessage) s", ignoring response/partition size limits" else "")) + if (traceEnabled) + trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " + + s"remaining response limit $limitBytes" + + (if (minOneMessage) s", ignoring response/partition size limits" else "")) val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader) val fetchTimeMs = time.milliseconds