diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java index e2727f5358d..30351c64b4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java @@ -23,12 +23,14 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.common.utils.Utils; import java.util.ArrayList; import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; /** @@ -123,21 +125,18 @@ public class NodeApiVersions { * Get the latest version supported by the broker within an allowed range of versions */ public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) { - ApiVersion usableVersion = supportedVersions.get(apiKey); - if (usableVersion == null) + if (!supportedVersions.containsKey(apiKey)) throw new UnsupportedVersionException("The broker does not support " + apiKey); - return latestUsableVersion(apiKey, usableVersion, oldestAllowedVersion, latestAllowedVersion); - } + ApiVersion supportedVersion = supportedVersions.get(apiKey); + Optional intersectVersion = supportedVersion.intersect( + new ApiVersion(apiKey.id, oldestAllowedVersion, latestAllowedVersion)); - private short latestUsableVersion(ApiKeys apiKey, ApiVersion supportedVersions, - short minAllowedVersion, short maxAllowedVersion) { - short minVersion = (short) Math.max(minAllowedVersion, supportedVersions.minVersion); - short maxVersion = (short) Math.min(maxAllowedVersion, supportedVersions.maxVersion); - if (minVersion > maxVersion) + if (intersectVersion.isPresent()) + return intersectVersion.get().maxVersion; + else throw new UnsupportedVersionException("The broker does not support " + apiKey + - " with version in range [" + minAllowedVersion + "," + maxAllowedVersion + "]. The supported" + - " range is [" + supportedVersions.minVersion + "," + supportedVersions.maxVersion + "]."); - return maxVersion; + " with version in range [" + oldestAllowedVersion + "," + latestAllowedVersion + "]. The supported" + + " range is [" + supportedVersion.minVersion + "," + supportedVersion.maxVersion + "]."); } /** @@ -227,4 +226,7 @@ public class NodeApiVersions { return supportedVersions.get(apiKey); } + public Map allSupportedApiVersions() { + return supportedVersions; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index d5dbf22c02b..14615cb020f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.ApiVersion; +import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.FetchSessionHandler; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index e58469b003a..7f9b5a9b108 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; -import org.apache.kafka.clients.ApiVersion; +import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.NodeApiVersions; diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersion.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java similarity index 79% rename from clients/src/main/java/org/apache/kafka/clients/ApiVersion.java rename to clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java index 9d606bbfa7a..b1e557b140f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ApiVersion.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java @@ -14,10 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.clients; +package org.apache.kafka.common.protocol; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; -import org.apache.kafka.common.protocol.ApiKeys; + +import java.util.Optional; /** * Represents the min version and max version of an api key. @@ -53,4 +54,14 @@ public class ApiVersion { ", maxVersion= " + maxVersion + ")"; } + + public Optional intersect(ApiVersion other) { + if (other == null) { + return Optional.empty(); + } + short minVersion = (short) Math.max(this.minVersion, other.minVersion); + short maxVersion = (short) Math.min(this.maxVersion, other.maxVersion); + return minVersion > maxVersion ? Optional.empty() : + Optional.of(new ApiVersion(apiKey, minVersion, maxVersion)); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 4d7397ad96d..1261c5247c4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.common.feature.Features; import org.apache.kafka.common.feature.FinalizedVersionRange; import org.apache.kafka.common.feature.SupportedVersionRange; @@ -33,6 +34,7 @@ import org.apache.kafka.common.record.RecordBatch; import java.nio.ByteBuffer; import java.util.Map; +import java.util.Optional; /** * Possible error codes: @@ -129,6 +131,47 @@ public class ApiVersionsResponse extends AbstractResponse { return apiKeys; } + /** + * Find the commonly agreed ApiVersions between local software and the controller. + * + * @param minMagic min inter broker magic + * @param activeControllerApiVersions controller ApiVersions + * @return commonly agreed ApiVersion collection + */ + public static ApiVersionsResponseKeyCollection intersectControllerApiVersions(final byte minMagic, + final Map activeControllerApiVersions) { + ApiVersionsResponseKeyCollection apiKeys = new ApiVersionsResponseKeyCollection(); + for (ApiKeys apiKey : ApiKeys.enabledApis()) { + if (apiKey.minRequiredInterBrokerMagic <= minMagic) { + ApiVersion brokerApiVersion = new ApiVersion( + apiKey.id, + apiKey.oldestVersion(), + apiKey.latestVersion() + ); + + final ApiVersion finalApiVersion; + if (!apiKey.forwardable) { + finalApiVersion = brokerApiVersion; + } else { + Optional intersectVersion = brokerApiVersion.intersect( + activeControllerApiVersions.getOrDefault(apiKey, null)); + if (intersectVersion.isPresent()) { + finalApiVersion = intersectVersion.get(); + } else { + // Controller doesn't support this API key, or there is no intersection. + continue; + } + } + + apiKeys.add(new ApiVersionsResponseKey() + .setApiKey(finalApiVersion.apiKey) + .setMinVersion(finalApiVersion.minVersion) + .setMaxVersion(finalApiVersion.maxVersion)); + } + } + return apiKeys; + } + public static ApiVersionsResponseData createApiVersionsResponseData( final int throttleTimeMs, final Errors error, diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java index dc26d5aa90d..1a44badd073 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.junit.jupiter.api.Test; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index b7cc9f1cd04..a9dd1711c48 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.admin; -import org.apache.kafka.clients.ApiVersion; +import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.MockClient; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 2ec2561d0c7..edfb3747dc0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; -import org.apache.kafka.clients.ApiVersion; +import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.CommitFailedException; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index 419cbc2afa8..1a3f450036a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -17,12 +17,17 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.protocol.ApiVersion; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.Test; import java.util.Collection; import java.util.HashSet; +import java.util.Map; import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -73,6 +78,39 @@ public class ApiVersionsResponseTest { assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeaturesEpoch()); } + @Test + public void shouldHaveCommonlyAgreedApiVersionResponseWithControllerOnForwardableAPIs() { + final ApiKeys forwardableAPIKey = ApiKeys.CREATE_ACLS; + final ApiKeys nonForwardableAPIKey = ApiKeys.JOIN_GROUP; + final short minVersion = 0; + final short maxVersion = 1; + Map activeControllerApiVersions = Utils.mkMap( + Utils.mkEntry(forwardableAPIKey, new ApiVersion(forwardableAPIKey.id, minVersion, maxVersion)), + Utils.mkEntry(nonForwardableAPIKey, new ApiVersion(nonForwardableAPIKey.id, minVersion, maxVersion)) + ); + + ApiVersionsResponseKeyCollection commonResponse = ApiVersionsResponse.intersectControllerApiVersions( + RecordBatch.CURRENT_MAGIC_VALUE, + activeControllerApiVersions); + + verifyVersions(forwardableAPIKey.id, minVersion, maxVersion, commonResponse); + + verifyVersions(nonForwardableAPIKey.id, ApiKeys.JOIN_GROUP.oldestVersion(), + ApiKeys.JOIN_GROUP.latestVersion(), commonResponse); + } + + private void verifyVersions(short forwardableAPIKey, + short minVersion, + short maxVersion, + ApiVersionsResponseKeyCollection commonResponse) { + ApiVersionsResponseKey expectedVersionsForForwardableAPI = + new ApiVersionsResponseKey() + .setApiKey(forwardableAPIKey) + .setMinVersion(minVersion) + .setMaxVersion(maxVersion); + assertEquals(expectedVersionsForForwardableAPI, commonResponse.find(forwardableAPIKey)); + } + private Set apiKeysInResponse(final ApiVersionsResponse apiVersions) { final Set apiKeys = new HashSet<>(); for (final ApiVersionsResponseKey version : apiVersions.data().apiKeys()) { diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index c859f8d0b1e..2d6e5159611 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -17,6 +17,7 @@ package kafka.api +import org.apache.kafka.clients.NodeApiVersions import org.apache.kafka.common.config.ConfigDef.Validator import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} @@ -148,13 +149,15 @@ object ApiVersion { def apiVersionsResponse(throttleTimeMs: Int, maxMagic: Byte, - latestSupportedFeatures: Features[SupportedVersionRange]): ApiVersionsResponse = { + latestSupportedFeatures: Features[SupportedVersionRange], + controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = { apiVersionsResponse( throttleTimeMs, maxMagic, latestSupportedFeatures, Features.emptyFinalizedFeatures, - ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH + ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, + controllerApiVersions ) } @@ -162,29 +165,34 @@ object ApiVersion { maxMagic: Byte, latestSupportedFeatures: Features[SupportedVersionRange], finalizedFeatures: Features[FinalizedVersionRange], - finalizedFeaturesEpoch: Long): ApiVersionsResponse = { - val apiKeys = ApiVersionsResponse.defaultApiKeys(maxMagic) + finalizedFeaturesEpoch: Long, + controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = { + val apiKeys = controllerApiVersions match { + case None => ApiVersionsResponse.defaultApiKeys(maxMagic) + case Some(controllerApiVersion) => ApiVersionsResponse.intersectControllerApiVersions( + maxMagic, controllerApiVersion.allSupportedApiVersions()) + } + if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && - throttleTimeMs == AbstractResponse.DEFAULT_THROTTLE_TIME) - return new ApiVersionsResponse( + throttleTimeMs == AbstractResponse.DEFAULT_THROTTLE_TIME) { + new ApiVersionsResponse( ApiVersionsResponse.createApiVersionsResponseData( DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs, Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data.errorCode), apiKeys, latestSupportedFeatures, finalizedFeatures, - finalizedFeaturesEpoch) - ) - - new ApiVersionsResponse( - ApiVersionsResponse.createApiVersionsResponseData( - throttleTimeMs, - Errors.NONE, - apiKeys, - latestSupportedFeatures, - finalizedFeatures, - finalizedFeaturesEpoch) - ) + finalizedFeaturesEpoch)) + } else { + new ApiVersionsResponse( + ApiVersionsResponse.createApiVersionsResponseData( + throttleTimeMs, + Errors.NONE, + apiKeys, + latestSupportedFeatures, + finalizedFeatures, + finalizedFeaturesEpoch)) + } } } diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index dac882ef58a..43434eebee2 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -18,6 +18,7 @@ package kafka.server import java.util.concurrent.LinkedBlockingDeque +import java.util.concurrent.atomic.AtomicReference import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} import kafka.utils.Logging @@ -50,6 +51,8 @@ class BrokerToControllerChannelManager( ) extends Logging { private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ") private val manualMetadataUpdater = new ManualMetadataUpdater() + private val apiVersions = new ApiVersions() + private val currentNodeApiVersions = NodeApiVersions.create() private val requestThread = newRequestThread def start(): Unit = { @@ -102,8 +105,8 @@ class BrokerToControllerChannelManager( config.connectionSetupTimeoutMaxMs, ClientDnsLookup.USE_ALL_DNS_IPS, time, - false, - new ApiVersions, + true, + apiVersions, logContext ) } @@ -140,6 +143,14 @@ class BrokerToControllerChannelManager( callback )) } + + def controllerApiVersions(): Option[NodeApiVersions] = + requestThread.activeControllerAddress().flatMap( + activeController => if (activeController.id() == config.brokerId) + Some(currentNodeApiVersions) + else + Option(apiVersions.get(activeController.idString())) + ) } abstract class ControllerRequestCompletionHandler extends RequestCompletionHandler { @@ -169,11 +180,19 @@ class BrokerToControllerRequestThread( ) extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) { private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]() - private var activeController: Option[Node] = None + private val activeController = new AtomicReference[Node](null) + + def activeControllerAddress(): Option[Node] = { + Option(activeController.get()) + } + + private def updateControllerAddress(newActiveController: Node): Unit = { + activeController.set(newActiveController) + } def enqueue(request: BrokerToControllerQueueItem): Unit = { requestQueue.add(request) - if (activeController.isDefined) { + if (activeControllerAddress().isDefined) { wakeup() } } @@ -190,14 +209,17 @@ class BrokerToControllerRequestThread( if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) { requestIter.remove() request.callback.onTimeout() - } else if (activeController.isDefined) { - requestIter.remove() - return Some(RequestAndCompletionHandler( - time.milliseconds(), - activeController.get, - request.request, - handleResponse(request) - )) + } else { + val controllerAddress = activeControllerAddress() + if (controllerAddress.isDefined) { + requestIter.remove() + return Some(RequestAndCompletionHandler( + time.milliseconds(), + controllerAddress.get, + request.request, + handleResponse(request) + )) + } } } None @@ -205,12 +227,15 @@ class BrokerToControllerRequestThread( private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = { if (response.wasDisconnected()) { - activeController = None + updateControllerAddress(null) requestQueue.putFirst(request) } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) { // just close the controller connection and wait for metadata cache update in doWork - networkClient.disconnect(activeController.get.idString) - activeController = None + activeControllerAddress().foreach { controllerAddress => { + networkClient.disconnect(controllerAddress.idString) + updateControllerAddress(null) + }} + requestQueue.putFirst(request) } else { request.callback.onComplete(response) @@ -218,7 +243,7 @@ class BrokerToControllerRequestThread( } override def doWork(): Unit = { - if (activeController.isDefined) { + if (activeControllerAddress().isDefined) { super.pollOnce(Long.MaxValue) } else { debug("Controller isn't cached, looking for local metadata changes") @@ -227,9 +252,8 @@ class BrokerToControllerRequestThread( case Some(controller) => info(s"Recorded new controller, from now on will use broker $controller") val controllerNode = controller.node(listenerName) - activeController = Some(controllerNode) + updateControllerAddress(controllerNode) metadataUpdater.setNodes(Seq(controllerNode).asJava) - case None => // need to backoff to avoid tight loops debug("No controller defined in metadata cache, retrying after backoff") diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala b/core/src/main/scala/kafka/server/ForwardingManager.scala index e4261a73a3a..8d285e80ae0 100644 --- a/core/src/main/scala/kafka/server/ForwardingManager.scala +++ b/core/src/main/scala/kafka/server/ForwardingManager.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import kafka.network.RequestChannel import kafka.utils.Logging -import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.clients.{ClientResponse, NodeApiVersions} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestHeader} @@ -33,9 +33,11 @@ import scala.concurrent.TimeoutException trait ForwardingManager { def forwardRequest( request: RequestChannel.Request, - responseCallback: AbstractResponse => Unit + responseCallback: Option[AbstractResponse] => Unit ): Unit + def controllerApiVersions(): Option[NodeApiVersions] + def start(): Unit = {} def shutdown(): Unit = {} @@ -75,9 +77,17 @@ class ForwardingManagerImpl( channelManager.shutdown() } + /** + * Forward given request to the active controller. + * + * @param request request to be forwarded + * @param responseCallback callback which takes in an `Option[AbstractResponse]`, where + * None is indicating that controller doesn't support the request + * version. + */ override def forwardRequest( request: RequestChannel.Request, - responseCallback: AbstractResponse => Unit + responseCallback: Option[AbstractResponse] => Unit ): Unit = { val principalSerde = request.context.principalSerde.asScala.getOrElse( throw new IllegalArgumentException(s"Cannot deserialize principal from request $request " + @@ -98,30 +108,41 @@ class ForwardingManagerImpl( val envelopeError = envelopeResponse.error() val requestBody = request.body[AbstractRequest] - val response = if (envelopeError != Errors.NONE) { - // An envelope error indicates broker misconfiguration (e.g. the principal serde - // might not be defined on the receiving broker). In this case, we do not return - // the error directly to the client since it would not be expected. Instead we - // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem - // on the broker. - debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError") - requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception) + // Unsupported version indicates an incompatibility between controller and client API versions. This + // could happen when the controller changed after the connection was established. The forwarding broker + // should close the connection with the client and let it reinitialize the connection and refresh + // the controller API versions. + if (envelopeError == Errors.UNSUPPORTED_VERSION) { + responseCallback(None) } else { - parseResponse(envelopeResponse.responseData, requestBody, request.header) + val response = if (envelopeError != Errors.NONE) { + // A general envelope error indicates broker misconfiguration (e.g. the principal serde + // might not be defined on the receiving broker). In this case, we do not return + // the error directly to the client since it would not be expected. Instead we + // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem + // on the broker. + debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError") + requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception) + } else { + parseResponse(envelopeResponse.responseData, requestBody, request.header) + } + responseCallback(Option(response)) } - responseCallback(response) } override def onTimeout(): Unit = { debug(s"Forwarding of the request $request failed due to timeout exception") val response = request.body[AbstractRequest].getErrorResponse(new TimeoutException) - responseCallback(response) + responseCallback(Option(response)) } } channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler) } + override def controllerApiVersions(): Option[NodeApiVersions] = + channelManager.controllerApiVersions() + private def parseResponse( buffer: ByteBuffer, request: AbstractRequest, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b06969cd54c..1b5c4d078a8 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -49,7 +49,7 @@ import org.apache.kafka.common.internals.{FatalExitError, Topic} import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult -import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetsResponseData, ListPartitionReassignmentsResponseData, MetadataResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} +import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, ApiVersionsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetsResponseData, ListPartitionReassignmentsResponseData, MetadataResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, OffsetForLeaderEpochResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData} import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection} import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection} import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse} @@ -62,7 +62,6 @@ import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic -import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ClientInformation, ListenerName, Send} @@ -139,8 +138,15 @@ class KafkaApis(val requestChannel: RequestChannel, request: RequestChannel.Request, handler: RequestChannel.Request => Unit ): Unit = { - def responseCallback(response: AbstractResponse): Unit = { - requestHelper.sendForwardedResponse(request, response) + def responseCallback(responseOpt: Option[AbstractResponse]): Unit = { + responseOpt match { + case Some(response) => requestHelper.sendForwardedResponse(request, response) + case None => + info(s"The client connection will be closed due to controller responded " + + s"unsupported version exception during $request forwarding. " + + s"This could happen when the controller changed after the connection was established.") + requestHelper.closeConnection(request, Collections.emptyMap()) + } } if (!request.isForwarded && !controller.isActive && isForwardingEnabled(request)) { @@ -1753,18 +1759,36 @@ class KafkaApis(val requestChannel: RequestChannel, else { val supportedFeatures = brokerFeatures.supportedFeatures val finalizedFeaturesOpt = finalizedFeatureCache.get - finalizedFeaturesOpt match { - case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse( - requestThrottleMs, - config.interBrokerProtocolVersion.recordVersion.value, - supportedFeatures, - finalizedFeatures.features, - finalizedFeatures.epoch) - case None => ApiVersion.apiVersionsResponse( - requestThrottleMs, - config.interBrokerProtocolVersion.recordVersion.value, - supportedFeatures) + val controllerApiVersions = if (isForwardingEnabled(request)) { + forwardingManager.controllerApiVersions() + } else { + None } + + val apiVersionsResponse = + finalizedFeaturesOpt match { + case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse( + requestThrottleMs, + config.interBrokerProtocolVersion.recordVersion.value, + supportedFeatures, + finalizedFeatures.features, + finalizedFeatures.epoch, + controllerApiVersions) + case None => ApiVersion.apiVersionsResponse( + requestThrottleMs, + config.interBrokerProtocolVersion.recordVersion.value, + supportedFeatures, + controllerApiVersions) + } + if (request.context.fromPrivilegedListener) { + apiVersionsResponse.data.apiKeys().add( + new ApiVersionsResponseData.ApiVersionsResponseKey() + .setApiKey(ApiKeys.ENVELOPE.id) + .setMinVersion(ApiKeys.ENVELOPE.oldestVersion()) + .setMaxVersion(ApiKeys.ENVELOPE.latestVersion()) + ) + } + apiVersionsResponse } } requestHelper.sendResponseMaybeThrottle(request, createResponseCallback) diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala index 5792c9c262d..81991bf6528 100644 --- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala +++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala @@ -34,7 +34,16 @@ import scala.jdk.CollectionConverters._ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness { - def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps) + def generateConfigs: Seq[KafkaConfig] = + TestUtils.createBrokerConfigs(1, zkConnect).map(props => { + // Configure control plane listener to make sure we have separate listeners from client, + // in order to avoid returning Envelope API version. + props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER") + props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT") + props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0") + props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:0") + props + }).map(KafkaConfig.fromProps) @Test(timeout=120000) def checkBrokerApiVersionCommandOutput(): Unit = { @@ -55,13 +64,10 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness { if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}" val usableVersion = nodeApiVersions.latestUsableVersion(apiKey) - // Admin client should not see ENVELOPE supported versions as its a broker-internal API. - val usableVersionInfo = if (apiKey == ApiKeys.ENVELOPE) "UNSUPPORTED" else - s"$versionRangeStr [usable: $usableVersion]" val terminator = if (apiKey == enabledApis.last) "" else "," - val line = s"\t${apiKey.name}(${apiKey.id}): $usableVersionInfo$terminator" + val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator" assertTrue(lineIter.hasNext) assertEquals(line, lineIter.next()) } diff --git a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala index 7b65bea5561..8f1f287a807 100644 --- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala +++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.cluster.{Broker, EndPoint} import kafka.utils.TestUtils import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, Metadata, MockClient} +import org.apache.kafka.common.Node import org.apache.kafka.common.feature.Features import org.apache.kafka.common.feature.Features.emptySupportedFeatures import org.apache.kafka.common.message.MetadataRequestData @@ -177,10 +178,11 @@ class BrokerToControllerRequestThreadTest { val metadataCache = mock(classOf[MetadataCache]) val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + val port = 1234 val oldController = new Broker(oldControllerId, - Seq(new EndPoint("host1", 1234, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures) + Seq(new EndPoint("host1", port, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures) val newController = new Broker(2, - Seq(new EndPoint("host2", 1234, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures) + Seq(new EndPoint("host2", port, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures) when(metadataCache.getControllerId).thenReturn(Some(oldControllerId), Some(newControllerId)) when(metadataCache.getAliveBrokers).thenReturn(Seq(oldController, newController)) @@ -204,18 +206,26 @@ class BrokerToControllerRequestThreadTest { testRequestThread.enqueue(queueItem) // initialize to the controller testRequestThread.doWork() + + val oldBrokerNode = new Node(oldControllerId, "host1", port) + assertEquals(Some(oldBrokerNode), testRequestThread.activeControllerAddress()) + // send and process the request mockClient.prepareResponse((body: AbstractRequest) => { body.isInstanceOf[MetadataRequest] && body.asInstanceOf[MetadataRequest].allowAutoTopicCreation() }, responseWithNotControllerError) testRequestThread.doWork() + assertEquals(None, testRequestThread.activeControllerAddress()) // reinitialize the controller to a different node testRequestThread.doWork() // process the request again mockClient.prepareResponse(expectedResponse) testRequestThread.doWork() + val newControllerNode = new Node(newControllerId, "host2", port) + assertEquals(Some(newControllerNode), testRequestThread.activeControllerAddress()) + assertTrue(completionHandler.completed.get()) } diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala index d0a4216fcce..363a3eb08d1 100644 --- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala +++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala @@ -180,7 +180,8 @@ class ApiVersionTest { val response = ApiVersion.apiVersionsResponse( 10, RecordBatch.MAGIC_VALUE_V1, - Features.emptySupportedFeatures + Features.emptySupportedFeatures, + None ) verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1) assertEquals(10, response.throttleTimeMs) @@ -198,7 +199,8 @@ class ApiVersionTest { Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange(1.toShort, 4.toShort)))), Features.finalizedFeatures( Utils.mkMap(Utils.mkEntry("feature", new FinalizedVersionRange(2.toShort, 3.toShort)))), - 10 + 10, + None ) verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1) @@ -227,7 +229,8 @@ class ApiVersionTest { val response = ApiVersion.apiVersionsResponse( AbstractResponse.DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE, - Features.emptySupportedFeatures + Features.emptySupportedFeatures, + None ) assertEquals(new util.HashSet[ApiKeys](ApiKeys.enabledApis), apiKeysInResponse(response)) assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs) @@ -241,7 +244,8 @@ class ApiVersionTest { val response = ApiVersion.apiVersionsResponse( AbstractResponse.DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE, - Features.emptySupportedFeatures + Features.emptySupportedFeatures, + None ) // Ensure that APIs needed for the internal metadata quorum (KIP-500) diff --git a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala index b699da6735b..3f38eb2759f 100644 --- a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala +++ b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala @@ -21,9 +21,9 @@ import java.util import java.util.Collections import org.apache.kafka.clients.MockClient.MockMetadataUpdater -import org.apache.kafka.clients.{ApiVersion, MockClient, NodeApiVersions} +import org.apache.kafka.clients.{MockClient, NodeApiVersions} import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumEpochResponseData, FetchResponseData, VoteResponseData} -import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} +import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, ApiVersion, Errors} import org.apache.kafka.common.requests.{AbstractResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse, EndQuorumEpochRequest, EndQuorumEpochResponse, FetchResponse, VoteRequest, VoteResponse} import org.apache.kafka.common.utils.{MockTime, Time} import org.apache.kafka.common.{Node, TopicPartition} diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 48d7e214e53..fc974067c36 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -16,7 +16,10 @@ */ package kafka.server +import java.util.Properties + import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse} import org.junit.Assert._ @@ -25,6 +28,16 @@ import scala.jdk.CollectionConverters._ abstract class AbstractApiVersionsRequestTest extends BaseRequestTest { + def controlPlaneListenerName = new ListenerName("CONTROLLER") + + // Configure control plane listener to make sure we have separate listeners for testing. + override def brokerPropertyOverrides(properties: Properties): Unit = { + properties.setProperty(KafkaConfig.ControlPlaneListenerNameProp, controlPlaneListenerName.value()) + properties.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${controlPlaneListenerName.value()}:$securityProtocol,$securityProtocol:$securityProtocol") + properties.setProperty("listeners", s"$securityProtocol://localhost:0,${controlPlaneListenerName.value()}://localhost:0") + properties.setProperty(KafkaConfig.AdvertisedListenersProp, s"$securityProtocol://localhost:0,${controlPlaneListenerName.value()}://localhost:0") + } + def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = { val overrideHeader = nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue) val socket = connect(anySocketServer) @@ -34,10 +47,13 @@ abstract class AbstractApiVersionsRequestTest extends BaseRequestTest { } finally socket.close() } - def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse): Unit = { - val enabledPublicApis = ApiKeys.enabledApis() + def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = interBrokerListenerName): Unit = { + val expectedApis = ApiKeys.enabledApis() + if (listenerName == controlPlaneListenerName) { + expectedApis.add(ApiKeys.ENVELOPE) + } assertEquals("API keys in ApiVersionsResponse must match API keys supported by broker.", - enabledPublicApis.size(), apiVersionsResponse.data.apiKeys().size()) + expectedApis.size(), apiVersionsResponse.data.apiKeys().size()) for (expectedApiVersion: ApiVersionsResponseKey <- ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data.apiKeys().asScala) { val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey) assertNotNull(s"API key ${actualApiVersion.apiKey} is supported by broker, but not received in ApiVersionsResponse.", actualApiVersion) diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index 916535bb8e3..f63601d41a4 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -18,6 +18,7 @@ package kafka.server import org.apache.kafka.common.message.ApiVersionsRequestData +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse} import org.junit.Assert._ @@ -34,6 +35,13 @@ class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest { validateApiVersionsResponse(apiVersionsResponse) } + @Test + def testApiVersionsRequestThroughControlPlaneListener(): Unit = { + val request = new ApiVersionsRequest.Builder().build() + val apiVersionsResponse = sendApiVersionsRequest(request, super.controlPlaneListenerName) + validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName) + } + @Test def testApiVersionsRequestWithUnsupportedVersion(): Unit = { val apiVersionsRequest = new ApiVersionsRequest.Builder().build() @@ -53,6 +61,13 @@ class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest { validateApiVersionsResponse(apiVersionsResponse) } + @Test + def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = { + val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short]) + val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, super.controlPlaneListenerName) + validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName) + } + @Test def testApiVersionsRequestValidationV3(): Unit = { // Invalid request because Name and Version are empty by default @@ -61,8 +76,8 @@ class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest { assertEquals(Errors.INVALID_REQUEST.code(), apiVersionsResponse.data.errorCode()) } - private def sendApiVersionsRequest(request: ApiVersionsRequest): ApiVersionsResponse = { - connectAndReceive[ApiVersionsResponse](request) + private def sendApiVersionsRequest(request: ApiVersionsRequest, + listenerName: ListenerName = super.listenerName): ApiVersionsResponse = { + connectAndReceive[ApiVersionsResponse](request, listenerName = listenerName) } - } diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala index ca03359f907..7ef7919f516 100644 --- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala @@ -19,6 +19,7 @@ package kafka.server import java.net.InetAddress import java.nio.ByteBuffer import java.util.Optional +import java.util.concurrent.atomic.AtomicBoolean import kafka.network import kafka.network.RequestChannel @@ -73,16 +74,57 @@ class ForwardingManagerTest { }) var response: AbstractResponse = null - forwardingManager.forwardRequest(request, res => response = res) + forwardingManager.forwardRequest(request, result => response = result.orNull) assertNotNull(response) assertEquals(Map(Errors.UNKNOWN_SERVER_ERROR -> 1).asJava, response.errorCounts()) } + @Test + def testUnsupportedVersions(): Unit = { + val forwardingManager = new ForwardingManagerImpl(brokerToController) + val requestCorrelationId = 27 + val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client") + + val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo") + val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava + val requestBody = new AlterConfigsRequest.Builder(Map( + configResource -> new AlterConfigsRequest.Config(configs) + ).asJava, false).build() + val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId) + val request = buildRequest(requestHeader, requestBuffer, clientPrincipal) + + val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData()) + + val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody, + requestHeader.apiVersion, requestCorrelationId) + + Mockito.when(brokerToController.sendRequest( + any(classOf[EnvelopeRequest.Builder]), + any(classOf[ControllerRequestCompletionHandler]) + )).thenAnswer(invocation => { + val completionHandler = invocation.getArgument[RequestCompletionHandler](1) + val response = buildEnvelopeResponse(responseBuffer, 30, + completionHandler, Errors.UNSUPPORTED_VERSION) + response.onComplete() + }) + + var response: AbstractResponse = null + val connectionClosed = new AtomicBoolean(false) + forwardingManager.forwardRequest(request, res => { + response = res.orNull + connectionClosed.set(true) + }) + + assertTrue(connectionClosed.get()) + assertNull(response) + } + private def buildEnvelopeResponse( responseBuffer: ByteBuffer, correlationId: Int, - completionHandler: RequestCompletionHandler + completionHandler: RequestCompletionHandler, + error: Errors = Errors.NONE ): ClientResponse = { val envelopeRequestHeader = new RequestHeader( ApiKeys.ENVELOPE, @@ -92,7 +134,7 @@ class ForwardingManagerTest { ) val envelopeResponse = new EnvelopeResponse( responseBuffer, - Errors.NONE + error ) new ClientResponse( diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 62d050ce22a..5bad37e7901 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -36,6 +36,7 @@ import kafka.network.RequestChannel.{CloseConnectionResponse, SendResponse} import kafka.server.QuotaFactory.QuotaManagers import kafka.utils.{MockTime, TestUtils} import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.NodeApiVersions import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.common.acl.AclOperation @@ -519,7 +520,7 @@ class KafkaApisTest { EasyMock.expect(forwardingManager.forwardRequest( EasyMock.eq(request), - anyObject[AbstractResponse => Unit]() + anyObject[Option[AbstractResponse] => Unit]() )).once() EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller, forwardingManager) @@ -690,6 +691,78 @@ class KafkaApisTest { } } + @Test + def testHandleApiVersionsWithControllerApiVersions(): Unit = { + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + + val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion, clientId, 0) + + val permittedVersion: Short = 0 + EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn( + Some(NodeApiVersions.create(ApiKeys.ALTER_CONFIGS.id, permittedVersion, permittedVersion))) + + val capturedResponse = expectNoThrottling() + + val apiVersionsRequest = new ApiVersionsRequest.Builder() + .build(requestHeader.apiVersion) + val request = buildRequest(apiVersionsRequest, + fromPrivilegedListener = true, requestHeader = Option(requestHeader)) + + EasyMock.replay(replicaManager, clientRequestQuotaManager, forwardingManager, + requestChannel, authorizer, adminManager, controller) + + createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handleApiVersionsRequest(request) + + val expectedVersions = new ApiVersionsResponseData.ApiVersionsResponseKey() + .setApiKey(ApiKeys.ALTER_CONFIGS.id) + .setMaxVersion(permittedVersion) + .setMinVersion(permittedVersion) + + val response = readResponse(apiVersionsRequest, capturedResponse) + .asInstanceOf[ApiVersionsResponse] + assertEquals(Errors.NONE, Errors.forCode(response.data().errorCode())) + + val alterConfigVersions = response.data().apiKeys().find(ApiKeys.ALTER_CONFIGS.id) + assertEquals(expectedVersions, alterConfigVersions) + + verify(authorizer, adminManager, forwardingManager) + } + + @Test + def testGetUnsupportedVersionsWhenControllerApiVersionsNotAvailable(): Unit = { + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + + val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion, clientId, 0) + + EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn(None) + + val capturedResponse = expectNoThrottling() + + val apiVersionsRequest = new ApiVersionsRequest.Builder() + .build(requestHeader.apiVersion) + val request = buildRequest(apiVersionsRequest, + fromPrivilegedListener = true, requestHeader = Option(requestHeader)) + + EasyMock.replay(replicaManager, clientRequestQuotaManager, forwardingManager, + requestChannel, authorizer, adminManager, controller) + + createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handleApiVersionsRequest(request) + + val response = readResponse(apiVersionsRequest, capturedResponse) + .asInstanceOf[ApiVersionsResponse] + assertEquals(Errors.NONE, Errors.forCode(response.data().errorCode())) + + val expectedVersions = new ApiVersionsResponseData.ApiVersionsResponseKey() + .setApiKey(ApiKeys.ALTER_CONFIGS.id) + .setMaxVersion(ApiKeys.ALTER_CONFIGS.latestVersion()) + .setMinVersion(ApiKeys.ALTER_CONFIGS.oldestVersion()) + + val alterConfigVersions = response.data().apiKeys().find(ApiKeys.ALTER_CONFIGS.id) + assertEquals(expectedVersions, alterConfigVersions) + + verify(authorizer, adminManager, forwardingManager) + } + @Test def testCreateTopicsWithAuthorizer(): Unit = { val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])