From 7a5f0cfaefb57cefa70c81d63c60316075c6dd97 Mon Sep 17 00:00:00 2001 From: Alok Nikhil Date: Fri, 8 Apr 2022 12:54:09 -0700 Subject: [PATCH] MINOR: Fix DescribeLogDirs API error handling for older API versions (#12017) With KAFKA-13527 / KIP-784 we introduced a new top-level error code for the DescribeLogDirs API for versions 3 and above. However, the change regressed the error handling for versions less than 3 since the response converter fails to write the non-zero error code out (rightly) for versions lower than 3 and drops the response to the client which eventually times out instead of receiving an empty log dirs response and processing that as a Cluster Auth failure. With this change, the API conditionally propagates the error code out to the client if the request API version is 3 and above. This keeps the semantics of the error handling the same for all versions and restores the behavior for older versions. See current behavior in the broker log: ```bash ERROR] 2022-04-08 01:22:56,406 [data-plane-kafka-request-handler-10] kafka.server.KafkaApis - [KafkaApi-0] Unexpected error handling request RequestHeader(apiKey=DESCRIBE_LOG_DIRS, apiVersion=0, clientId=sarama, correlationId=1) -- DescribeLogDirsRequestData(topics=null) org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0 [ERROR] 2022-04-08 01:22:56,407 [data-plane-kafka-request-handler-10] kafka.server.KafkaRequestHandler - [Kafka Request Handler 10 on Broker 0], Exception when handling request org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0 ``` Reviewers: Ismael Juma --- .../message/DescribeLogDirsResponse.json | 3 +- .../network/RequestConvertToJsonTest.scala | 30 +++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json index 0171a16481f..c79e756aada 100644 --- a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json +++ b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json @@ -25,7 +25,8 @@ "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, - { "name": "ErrorCode", "type": "int16", "versions": "3+", "about": "The error code, or 0 if there was no error." }, + { "name": "ErrorCode", "type": "int16", "versions": "3+", + "ignorable": true, "about": "The error code, or 0 if there was no error." }, { "name": "Results", "type": "[]DescribeLogDirsResult", "versions": "0+", "about": "The log directories.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", diff --git a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala index 09dceac1510..0ce8448a4f2 100644 --- a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala @@ -19,14 +19,13 @@ package kafka.network import java.net.InetAddress import java.nio.ByteBuffer - import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode} import kafka.network import kafka.network.RequestConvertToJson.requestHeaderNode import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message._ import org.apache.kafka.common.network.{ClientInformation, ListenerName, NetworkSend} -import org.apache.kafka.common.protocol.{ApiKeys, MessageUtil} +import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil} import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.junit.jupiter.api.Assertions.assertEquals @@ -61,6 +60,33 @@ class RequestConvertToJsonTest { assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled request keys") } + @Test + def testAllApiVersionsResponseHandled(): Unit = { + + ApiKeys.values().foreach { key => { + val unhandledVersions = ArrayBuffer[java.lang.Short]() + key.allVersions().forEach { version => { + val message = key match { + // Specify top-level error handling for verifying compatibility across versions + case ApiKeys.DESCRIBE_LOG_DIRS => + ApiMessageType.fromApiKey(key.id).newResponse().asInstanceOf[DescribeLogDirsResponseData] + .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()) + case _ => + ApiMessageType.fromApiKey(key.id).newResponse() + } + + val bytes = MessageUtil.toByteBuffer(message, version) + val response = AbstractResponse.parseResponse(key, bytes, version) + try { + RequestConvertToJson.response(response, version) + } catch { + case _ : IllegalStateException => unhandledVersions += version + }} + } + assertEquals(ArrayBuffer.empty, unhandledVersions, s"API: ${key.toString} - Unhandled request versions") + }} + } + @Test def testAllResponseTypesHandled(): Unit = { val unhandledKeys = ArrayBuffer[String]()