mirror of https://github.com/apache/kafka.git
				
				
				
			MINOR: Fix DescribeLogDirs API error handling for older API versions (#12017)
With KAFKA-13527 / KIP-784 we introduced a new top-level error code for the DescribeLogDirs API for versions 3 and above. However, the change regressed the error handling for versions less than 3 since the response converter fails to write the non-zero error code out (rightly) for versions lower than 3 and drops the response to the client which eventually times out instead of receiving an empty log dirs response and processing that as a Cluster Auth failure. With this change, the API conditionally propagates the error code out to the client if the request API version is 3 and above. This keeps the semantics of the error handling the same for all versions and restores the behavior for older versions. See current behavior in the broker log: ```bash ERROR] 2022-04-08 01:22:56,406 [data-plane-kafka-request-handler-10] kafka.server.KafkaApis - [KafkaApi-0] Unexpected error handling request RequestHeader(apiKey=DESCRIBE_LOG_DIRS, apiVersion=0, clientId=sarama, correlationId=1) -- DescribeLogDirsRequestData(topics=null) org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0 [ERROR] 2022-04-08 01:22:56,407 [data-plane-kafka-request-handler-10] kafka.server.KafkaRequestHandler - [Kafka Request Handler 10 on Broker 0], Exception when handling request org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0 ``` Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
		
							parent
							
								
									ce4f2ad606
								
							
						
					
					
						commit
						7a5f0cfaef
					
				|  | @ -25,7 +25,8 @@ | |||
|   "fields": [ | ||||
|     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", | ||||
|       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, | ||||
|     { "name": "ErrorCode", "type": "int16", "versions": "3+", "about": "The error code, or 0 if there was no error." }, | ||||
|     { "name": "ErrorCode", "type": "int16", "versions": "3+", | ||||
|       "ignorable": true, "about": "The error code, or 0 if there was no error." }, | ||||
|     { "name": "Results", "type": "[]DescribeLogDirsResult", "versions": "0+", | ||||
|       "about": "The log directories.", "fields": [ | ||||
|       { "name": "ErrorCode", "type": "int16", "versions": "0+", | ||||
|  |  | |||
|  | @ -19,14 +19,13 @@ package kafka.network | |||
| 
 | ||||
| import java.net.InetAddress | ||||
| import java.nio.ByteBuffer | ||||
| 
 | ||||
| import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode} | ||||
| import kafka.network | ||||
| import kafka.network.RequestConvertToJson.requestHeaderNode | ||||
| import org.apache.kafka.common.memory.MemoryPool | ||||
| import org.apache.kafka.common.message._ | ||||
| import org.apache.kafka.common.network.{ClientInformation, ListenerName, NetworkSend} | ||||
| import org.apache.kafka.common.protocol.{ApiKeys, MessageUtil} | ||||
| import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil} | ||||
| import org.apache.kafka.common.requests._ | ||||
| import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} | ||||
| import org.junit.jupiter.api.Assertions.assertEquals | ||||
|  | @ -61,6 +60,33 @@ class RequestConvertToJsonTest { | |||
|     assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled request keys") | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testAllApiVersionsResponseHandled(): Unit = { | ||||
| 
 | ||||
|     ApiKeys.values().foreach { key => { | ||||
|       val unhandledVersions = ArrayBuffer[java.lang.Short]() | ||||
|       key.allVersions().forEach { version => { | ||||
|         val message = key match { | ||||
|           // Specify top-level error handling for verifying compatibility across versions | ||||
|           case ApiKeys.DESCRIBE_LOG_DIRS => | ||||
|             ApiMessageType.fromApiKey(key.id).newResponse().asInstanceOf[DescribeLogDirsResponseData] | ||||
|               .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()) | ||||
|           case _ => | ||||
|             ApiMessageType.fromApiKey(key.id).newResponse() | ||||
|         } | ||||
| 
 | ||||
|         val bytes = MessageUtil.toByteBuffer(message, version) | ||||
|         val response = AbstractResponse.parseResponse(key, bytes, version) | ||||
|         try { | ||||
|           RequestConvertToJson.response(response, version) | ||||
|         } catch { | ||||
|           case _ : IllegalStateException => unhandledVersions += version | ||||
|         }} | ||||
|       } | ||||
|       assertEquals(ArrayBuffer.empty, unhandledVersions, s"API: ${key.toString} - Unhandled request versions") | ||||
|     }} | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testAllResponseTypesHandled(): Unit = { | ||||
|     val unhandledKeys = ArrayBuffer[String]() | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue