diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java index 5bcce29a888..f63c3d98e1e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java @@ -108,6 +108,10 @@ public class RequestHeader implements AbstractRequestResponse { return size; } + public boolean isApiVersionDeprecated() { + return apiKey().isVersionDeprecated(apiVersion()); + } + public ResponseHeader toResponseHeader() { return new ResponseHeader(data.correlationId(), apiKey().responseHeaderVersion(apiVersion())); } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 44f5e926eb9..aba46474867 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -48,7 +48,12 @@ object RequestChannel extends Logging { private val ResponseQueueSizeMetric = "ResponseQueueSize" val ProcessorMetricTag = "processor" - private def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled + /** + * Deprecated protocol apis are logged at info level while the rest are logged at debug level. + * That makes it possible to enable the former without enabling latter. + */ + private def isRequestLoggingEnabled(header: RequestHeader): Boolean = requestLogger.underlying.isDebugEnabled || + (requestLogger.underlying.isInfoEnabled && header.isApiVersionDeprecated()) sealed trait BaseRequest case object ShutdownRequest extends BaseRequest @@ -84,7 +89,7 @@ object RequestChannel extends Logging { // This is constructed on creation of a Request so that the JSON representation is computed before the request is // processed by the api layer. Otherwise, a ProduceRequest can occur without its data (ie. it goes into purgatory). val requestLog: Option[JsonNode] = - if (RequestChannel.isRequestLoggingEnabled) Some(RequestConvertToJson.request(loggableRequest)) + if (RequestChannel.isRequestLoggingEnabled(context.header)) Some(RequestConvertToJson.request(loggableRequest)) else None def header: RequestHeader = context.header @@ -128,7 +133,7 @@ object RequestChannel extends Logging { } def responseNode(response: AbstractResponse): Option[JsonNode] = { - if (RequestChannel.isRequestLoggingEnabled) + if (RequestChannel.isRequestLoggingEnabled(context.header)) Some(RequestConvertToJson.response(response, context.apiVersion)) else None @@ -249,14 +254,19 @@ object RequestChannel extends Logging { // the total time spent on authentication, which may be significant for SASL/SSL. recordNetworkThreadTimeCallback.foreach(record => record.accept(networkThreadTimeNanos)) - if (isRequestLoggingEnabled) { + if (isRequestLoggingEnabled(header)) { val desc = RequestConvertToJson.requestDescMetrics(header, requestLog.toJava, response.responseLog.toJava, context, session, isForwarded, totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs, responseSendTimeMs, temporaryMemoryBytes, messageConversionsTimeMs) - requestLogger.debug("Completed request:" + desc.toString) + val logPrefix = "Completed request: {}" + // log deprecated apis at `info` level to allow them to be selectively enabled + if (header.isApiVersionDeprecated()) + requestLogger.info(logPrefix, desc) + else + requestLogger.debug(logPrefix, desc) } } diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java index f7b26601f7c..b4b0cadb16d 100644 --- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java +++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java @@ -776,7 +776,7 @@ public class RequestConvertToJson { header.data(), header.headerVersion(), false ); node.set("requestApiKeyName", new TextNode(header.apiKey().toString())); - if (header.apiKey().isVersionDeprecated(header.apiVersion())) { + if (header.isApiVersionDeprecated()) { node.set("requestApiVersionDeprecated", BooleanNode.TRUE); } return node;