From 988d4d8e65a14390abd748318a64e281e4a37c19 Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Tue, 30 Apr 2013 17:20:54 -0700 Subject: [PATCH] KAFKA-892 Change request log to include request completion not handling; reviewed by Joel Koshy --- core/src/main/scala/kafka/network/RequestChannel.scala | 7 +++++-- core/src/main/scala/kafka/server/KafkaApis.scala | 4 +--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index c8f81c00cdf..7b8d1f060f0 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,6 +26,7 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ +import org.apache.log4j.Logger object RequestChannel extends Logging { @@ -47,6 +48,7 @@ object RequestChannel extends Logging { val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer = null + private val requestLogger = Logger.getLogger("kafka.request.logger") trace("Received request : %s".format(requestObj)) def updateRequestMetrics() { @@ -76,8 +78,9 @@ object RequestChannel extends Logging { m.responseSendTimeHist.update(responseSendTime) m.totalTimeHist.update(totalTime) } - trace("Completed request : %s, totalTime:%d, queueTime:%d, localTime:%d, remoteTime:%d, sendTime:%d" - .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) + if(requestLogger.isTraceEnabled) + requestLogger.trace("Completed request:%s from client %s;totalTime:%d,queueTime:%d,localTime:%d,remoteTime:%d,sendTime:%d" + .format(requestObj, remoteAddress, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bb178d6e47b..f5288bf1da9 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -46,7 +46,6 @@ class KafkaApis(val requestChannel: RequestChannel, new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) private val delayedRequestMetrics = new DelayedRequestMetrics - private val requestLogger = Logger.getLogger("kafka.request.logger") this.logIdent = "[KafkaApi-%d] ".format(brokerId) /** @@ -54,8 +53,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handle(request: RequestChannel.Request) { try{ - if(requestLogger.isTraceEnabled) - requestLogger.trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) + trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) request.requestId match { case RequestKeys.ProduceKey => handleProducerRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request)