KAFKA-10674: Controller API version bond with forwardable APIs (#9600)

Get controller api version intersection setup for client queries. When the unsupported exception was hit in the EnvelopeResponse, close the client connection to let it rediscover the api version.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Boyang Chen 2021-01-15 20:36:25 -08:00 committed by GitHub
parent 8f063c15da
commit bfc96efa3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 444 additions and 106 deletions

View File

@ -23,12 +23,14 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiVersion;
import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
/**
@ -123,21 +125,18 @@ public class NodeApiVersions {
* Get the latest version supported by the broker within an allowed range of versions
*/
public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
ApiVersion usableVersion = supportedVersions.get(apiKey);
if (usableVersion == null)
if (!supportedVersions.containsKey(apiKey))
throw new UnsupportedVersionException("The broker does not support " + apiKey);
return latestUsableVersion(apiKey, usableVersion, oldestAllowedVersion, latestAllowedVersion);
}
ApiVersion supportedVersion = supportedVersions.get(apiKey);
Optional<ApiVersion> intersectVersion = supportedVersion.intersect(
new ApiVersion(apiKey.id, oldestAllowedVersion, latestAllowedVersion));
private short latestUsableVersion(ApiKeys apiKey, ApiVersion supportedVersions,
short minAllowedVersion, short maxAllowedVersion) {
short minVersion = (short) Math.max(minAllowedVersion, supportedVersions.minVersion);
short maxVersion = (short) Math.min(maxAllowedVersion, supportedVersions.maxVersion);
if (minVersion > maxVersion)
if (intersectVersion.isPresent())
return intersectVersion.get().maxVersion;
else
throw new UnsupportedVersionException("The broker does not support " + apiKey +
" with version in range [" + minAllowedVersion + "," + maxAllowedVersion + "]. The supported" +
" range is [" + supportedVersions.minVersion + "," + supportedVersions.maxVersion + "].");
return maxVersion;
" with version in range [" + oldestAllowedVersion + "," + latestAllowedVersion + "]. The supported" +
" range is [" + supportedVersion.minVersion + "," + supportedVersion.maxVersion + "].");
}
/**
@ -227,4 +226,7 @@ public class NodeApiVersions {
return supportedVersions.get(apiKey);
}
public Map<ApiKeys, ApiVersion> allSupportedApiVersions() {
return supportedVersions;
}
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersion;
import org.apache.kafka.common.protocol.ApiVersion;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersion;
import org.apache.kafka.common.protocol.ApiVersion;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.NodeApiVersions;

View File

@ -14,10 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.protocol.ApiKeys;
import java.util.Optional;
/**
* Represents the min version and max version of an api key.
@ -53,4 +54,14 @@ public class ApiVersion {
", maxVersion= " + maxVersion +
")";
}
public Optional<ApiVersion> intersect(ApiVersion other) {
if (other == null) {
return Optional.empty();
}
short minVersion = (short) Math.max(this.minVersion, other.minVersion);
short maxVersion = (short) Math.min(this.maxVersion, other.maxVersion);
return minVersion > maxVersion ? Optional.empty() :
Optional.of(new ApiVersion(apiKey, minVersion, maxVersion));
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiVersion;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.feature.FinalizedVersionRange;
import org.apache.kafka.common.feature.SupportedVersionRange;
@ -33,6 +34,7 @@ import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
/**
* Possible error codes:
@ -129,6 +131,47 @@ public class ApiVersionsResponse extends AbstractResponse {
return apiKeys;
}
/**
* Find the commonly agreed ApiVersions between local software and the controller.
*
* @param minMagic min inter broker magic
* @param activeControllerApiVersions controller ApiVersions
* @return commonly agreed ApiVersion collection
*/
public static ApiVersionsResponseKeyCollection intersectControllerApiVersions(final byte minMagic,
final Map<ApiKeys, ApiVersion> activeControllerApiVersions) {
ApiVersionsResponseKeyCollection apiKeys = new ApiVersionsResponseKeyCollection();
for (ApiKeys apiKey : ApiKeys.enabledApis()) {
if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
ApiVersion brokerApiVersion = new ApiVersion(
apiKey.id,
apiKey.oldestVersion(),
apiKey.latestVersion()
);
final ApiVersion finalApiVersion;
if (!apiKey.forwardable) {
finalApiVersion = brokerApiVersion;
} else {
Optional<ApiVersion> intersectVersion = brokerApiVersion.intersect(
activeControllerApiVersions.getOrDefault(apiKey, null));
if (intersectVersion.isPresent()) {
finalApiVersion = intersectVersion.get();
} else {
// Controller doesn't support this API key, or there is no intersection.
continue;
}
}
apiKeys.add(new ApiVersionsResponseKey()
.setApiKey(finalApiVersion.apiKey)
.setMinVersion(finalApiVersion.minVersion)
.setMaxVersion(finalApiVersion.maxVersion));
}
}
return apiKeys;
}
public static ApiVersionsResponseData createApiVersionsResponseData(
final int throttleTimeMs,
final Errors error,

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiVersion;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.junit.jupiter.api.Test;

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.ApiVersion;
import org.apache.kafka.common.protocol.ApiVersion;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.MockClient;

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersion;
import org.apache.kafka.common.protocol.ApiVersion;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.CommitFailedException;

View File

@ -17,12 +17,17 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -73,6 +78,39 @@ public class ApiVersionsResponseTest {
assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeaturesEpoch());
}
@Test
public void shouldHaveCommonlyAgreedApiVersionResponseWithControllerOnForwardableAPIs() {
final ApiKeys forwardableAPIKey = ApiKeys.CREATE_ACLS;
final ApiKeys nonForwardableAPIKey = ApiKeys.JOIN_GROUP;
final short minVersion = 0;
final short maxVersion = 1;
Map<ApiKeys, ApiVersion> activeControllerApiVersions = Utils.mkMap(
Utils.mkEntry(forwardableAPIKey, new ApiVersion(forwardableAPIKey.id, minVersion, maxVersion)),
Utils.mkEntry(nonForwardableAPIKey, new ApiVersion(nonForwardableAPIKey.id, minVersion, maxVersion))
);
ApiVersionsResponseKeyCollection commonResponse = ApiVersionsResponse.intersectControllerApiVersions(
RecordBatch.CURRENT_MAGIC_VALUE,
activeControllerApiVersions);
verifyVersions(forwardableAPIKey.id, minVersion, maxVersion, commonResponse);
verifyVersions(nonForwardableAPIKey.id, ApiKeys.JOIN_GROUP.oldestVersion(),
ApiKeys.JOIN_GROUP.latestVersion(), commonResponse);
}
private void verifyVersions(short forwardableAPIKey,
short minVersion,
short maxVersion,
ApiVersionsResponseKeyCollection commonResponse) {
ApiVersionsResponseKey expectedVersionsForForwardableAPI =
new ApiVersionsResponseKey()
.setApiKey(forwardableAPIKey)
.setMinVersion(minVersion)
.setMaxVersion(maxVersion);
assertEquals(expectedVersionsForForwardableAPI, commonResponse.find(forwardableAPIKey));
}
private Set<ApiKeys> apiKeysInResponse(final ApiVersionsResponse apiVersions) {
final Set<ApiKeys> apiKeys = new HashSet<>();
for (final ApiVersionsResponseKey version : apiVersions.data().apiKeys()) {

View File

@ -17,6 +17,7 @@
package kafka.api
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.config.ConfigDef.Validator
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange}
@ -148,13 +149,15 @@ object ApiVersion {
def apiVersionsResponse(throttleTimeMs: Int,
maxMagic: Byte,
latestSupportedFeatures: Features[SupportedVersionRange]): ApiVersionsResponse = {
latestSupportedFeatures: Features[SupportedVersionRange],
controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = {
apiVersionsResponse(
throttleTimeMs,
maxMagic,
latestSupportedFeatures,
Features.emptyFinalizedFeatures,
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH
ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
controllerApiVersions
)
}
@ -162,29 +165,34 @@ object ApiVersion {
maxMagic: Byte,
latestSupportedFeatures: Features[SupportedVersionRange],
finalizedFeatures: Features[FinalizedVersionRange],
finalizedFeaturesEpoch: Long): ApiVersionsResponse = {
val apiKeys = ApiVersionsResponse.defaultApiKeys(maxMagic)
finalizedFeaturesEpoch: Long,
controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = {
val apiKeys = controllerApiVersions match {
case None => ApiVersionsResponse.defaultApiKeys(maxMagic)
case Some(controllerApiVersion) => ApiVersionsResponse.intersectControllerApiVersions(
maxMagic, controllerApiVersion.allSupportedApiVersions())
}
if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE &&
throttleTimeMs == AbstractResponse.DEFAULT_THROTTLE_TIME)
return new ApiVersionsResponse(
throttleTimeMs == AbstractResponse.DEFAULT_THROTTLE_TIME) {
new ApiVersionsResponse(
ApiVersionsResponse.createApiVersionsResponseData(
DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs,
Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data.errorCode),
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch)
)
new ApiVersionsResponse(
ApiVersionsResponse.createApiVersionsResponseData(
throttleTimeMs,
Errors.NONE,
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch)
)
finalizedFeaturesEpoch))
} else {
new ApiVersionsResponse(
ApiVersionsResponse.createApiVersionsResponseData(
throttleTimeMs,
Errors.NONE,
apiKeys,
latestSupportedFeatures,
finalizedFeatures,
finalizedFeaturesEpoch))
}
}
}

View File

@ -18,6 +18,7 @@
package kafka.server
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicReference
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.utils.Logging
@ -50,6 +51,8 @@ class BrokerToControllerChannelManager(
) extends Logging {
private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions()
private val currentNodeApiVersions = NodeApiVersions.create()
private val requestThread = newRequestThread
def start(): Unit = {
@ -102,8 +105,8 @@ class BrokerToControllerChannelManager(
config.connectionSetupTimeoutMaxMs,
ClientDnsLookup.USE_ALL_DNS_IPS,
time,
false,
new ApiVersions,
true,
apiVersions,
logContext
)
}
@ -140,6 +143,14 @@ class BrokerToControllerChannelManager(
callback
))
}
def controllerApiVersions(): Option[NodeApiVersions] =
requestThread.activeControllerAddress().flatMap(
activeController => if (activeController.id() == config.brokerId)
Some(currentNodeApiVersions)
else
Option(apiVersions.get(activeController.idString()))
)
}
abstract class ControllerRequestCompletionHandler extends RequestCompletionHandler {
@ -169,11 +180,19 @@ class BrokerToControllerRequestThread(
) extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) {
private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
private var activeController: Option[Node] = None
private val activeController = new AtomicReference[Node](null)
def activeControllerAddress(): Option[Node] = {
Option(activeController.get())
}
private def updateControllerAddress(newActiveController: Node): Unit = {
activeController.set(newActiveController)
}
def enqueue(request: BrokerToControllerQueueItem): Unit = {
requestQueue.add(request)
if (activeController.isDefined) {
if (activeControllerAddress().isDefined) {
wakeup()
}
}
@ -190,14 +209,17 @@ class BrokerToControllerRequestThread(
if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) {
requestIter.remove()
request.callback.onTimeout()
} else if (activeController.isDefined) {
requestIter.remove()
return Some(RequestAndCompletionHandler(
time.milliseconds(),
activeController.get,
request.request,
handleResponse(request)
))
} else {
val controllerAddress = activeControllerAddress()
if (controllerAddress.isDefined) {
requestIter.remove()
return Some(RequestAndCompletionHandler(
time.milliseconds(),
controllerAddress.get,
request.request,
handleResponse(request)
))
}
}
}
None
@ -205,12 +227,15 @@ class BrokerToControllerRequestThread(
private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
if (response.wasDisconnected()) {
activeController = None
updateControllerAddress(null)
requestQueue.putFirst(request)
} else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
// just close the controller connection and wait for metadata cache update in doWork
networkClient.disconnect(activeController.get.idString)
activeController = None
activeControllerAddress().foreach { controllerAddress => {
networkClient.disconnect(controllerAddress.idString)
updateControllerAddress(null)
}}
requestQueue.putFirst(request)
} else {
request.callback.onComplete(response)
@ -218,7 +243,7 @@ class BrokerToControllerRequestThread(
}
override def doWork(): Unit = {
if (activeController.isDefined) {
if (activeControllerAddress().isDefined) {
super.pollOnce(Long.MaxValue)
} else {
debug("Controller isn't cached, looking for local metadata changes")
@ -227,9 +252,8 @@ class BrokerToControllerRequestThread(
case Some(controller) =>
info(s"Recorded new controller, from now on will use broker $controller")
val controllerNode = controller.node(listenerName)
activeController = Some(controllerNode)
updateControllerAddress(controllerNode)
metadataUpdater.setNodes(Seq(controllerNode).asJava)
case None =>
// need to backoff to avoid tight loops
debug("No controller defined in metadata cache, retrying after backoff")

View File

@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import kafka.network.RequestChannel
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.clients.{ClientResponse, NodeApiVersions}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestHeader}
@ -33,9 +33,11 @@ import scala.concurrent.TimeoutException
trait ForwardingManager {
def forwardRequest(
request: RequestChannel.Request,
responseCallback: AbstractResponse => Unit
responseCallback: Option[AbstractResponse] => Unit
): Unit
def controllerApiVersions(): Option[NodeApiVersions]
def start(): Unit = {}
def shutdown(): Unit = {}
@ -75,9 +77,17 @@ class ForwardingManagerImpl(
channelManager.shutdown()
}
/**
* Forward given request to the active controller.
*
* @param request request to be forwarded
* @param responseCallback callback which takes in an `Option[AbstractResponse]`, where
* None is indicating that controller doesn't support the request
* version.
*/
override def forwardRequest(
request: RequestChannel.Request,
responseCallback: AbstractResponse => Unit
responseCallback: Option[AbstractResponse] => Unit
): Unit = {
val principalSerde = request.context.principalSerde.asScala.getOrElse(
throw new IllegalArgumentException(s"Cannot deserialize principal from request $request " +
@ -98,30 +108,41 @@ class ForwardingManagerImpl(
val envelopeError = envelopeResponse.error()
val requestBody = request.body[AbstractRequest]
val response = if (envelopeError != Errors.NONE) {
// An envelope error indicates broker misconfiguration (e.g. the principal serde
// might not be defined on the receiving broker). In this case, we do not return
// the error directly to the client since it would not be expected. Instead we
// return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem
// on the broker.
debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError")
requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
// Unsupported version indicates an incompatibility between controller and client API versions. This
// could happen when the controller changed after the connection was established. The forwarding broker
// should close the connection with the client and let it reinitialize the connection and refresh
// the controller API versions.
if (envelopeError == Errors.UNSUPPORTED_VERSION) {
responseCallback(None)
} else {
parseResponse(envelopeResponse.responseData, requestBody, request.header)
val response = if (envelopeError != Errors.NONE) {
// A general envelope error indicates broker misconfiguration (e.g. the principal serde
// might not be defined on the receiving broker). In this case, we do not return
// the error directly to the client since it would not be expected. Instead we
// return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem
// on the broker.
debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError")
requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
} else {
parseResponse(envelopeResponse.responseData, requestBody, request.header)
}
responseCallback(Option(response))
}
responseCallback(response)
}
override def onTimeout(): Unit = {
debug(s"Forwarding of the request $request failed due to timeout exception")
val response = request.body[AbstractRequest].getErrorResponse(new TimeoutException)
responseCallback(response)
responseCallback(Option(response))
}
}
channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler)
}
override def controllerApiVersions(): Option[NodeApiVersions] =
channelManager.controllerApiVersions()
private def parseResponse(
buffer: ByteBuffer,
request: AbstractRequest,

View File

@ -49,7 +49,7 @@ import org.apache.kafka.common.internals.{FatalExitError, Topic}
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetsResponseData, ListPartitionReassignmentsResponseData, MetadataResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, ApiVersionsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetsResponseData, ListPartitionReassignmentsResponseData, MetadataResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, OffsetForLeaderEpochResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection}
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse}
@ -62,7 +62,6 @@ import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ListenerName, Send}
@ -139,8 +138,15 @@ class KafkaApis(val requestChannel: RequestChannel,
request: RequestChannel.Request,
handler: RequestChannel.Request => Unit
): Unit = {
def responseCallback(response: AbstractResponse): Unit = {
requestHelper.sendForwardedResponse(request, response)
def responseCallback(responseOpt: Option[AbstractResponse]): Unit = {
responseOpt match {
case Some(response) => requestHelper.sendForwardedResponse(request, response)
case None =>
info(s"The client connection will be closed due to controller responded " +
s"unsupported version exception during $request forwarding. " +
s"This could happen when the controller changed after the connection was established.")
requestHelper.closeConnection(request, Collections.emptyMap())
}
}
if (!request.isForwarded && !controller.isActive && isForwardingEnabled(request)) {
@ -1753,18 +1759,36 @@ class KafkaApis(val requestChannel: RequestChannel,
else {
val supportedFeatures = brokerFeatures.supportedFeatures
val finalizedFeaturesOpt = finalizedFeatureCache.get
finalizedFeaturesOpt match {
case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
requestThrottleMs,
config.interBrokerProtocolVersion.recordVersion.value,
supportedFeatures,
finalizedFeatures.features,
finalizedFeatures.epoch)
case None => ApiVersion.apiVersionsResponse(
requestThrottleMs,
config.interBrokerProtocolVersion.recordVersion.value,
supportedFeatures)
val controllerApiVersions = if (isForwardingEnabled(request)) {
forwardingManager.controllerApiVersions()
} else {
None
}
val apiVersionsResponse =
finalizedFeaturesOpt match {
case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
requestThrottleMs,
config.interBrokerProtocolVersion.recordVersion.value,
supportedFeatures,
finalizedFeatures.features,
finalizedFeatures.epoch,
controllerApiVersions)
case None => ApiVersion.apiVersionsResponse(
requestThrottleMs,
config.interBrokerProtocolVersion.recordVersion.value,
supportedFeatures,
controllerApiVersions)
}
if (request.context.fromPrivilegedListener) {
apiVersionsResponse.data.apiKeys().add(
new ApiVersionsResponseData.ApiVersionsResponseKey()
.setApiKey(ApiKeys.ENVELOPE.id)
.setMinVersion(ApiKeys.ENVELOPE.oldestVersion())
.setMaxVersion(ApiKeys.ENVELOPE.latestVersion())
)
}
apiVersionsResponse
}
}
requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)

View File

@ -34,7 +34,16 @@ import scala.jdk.CollectionConverters._
class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
def generateConfigs: Seq[KafkaConfig] =
TestUtils.createBrokerConfigs(1, zkConnect).map(props => {
// Configure control plane listener to make sure we have separate listeners from client,
// in order to avoid returning Envelope API version.
props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER")
props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
props
}).map(KafkaConfig.fromProps)
@Test(timeout=120000)
def checkBrokerApiVersionCommandOutput(): Unit = {
@ -55,13 +64,10 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString
else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
// Admin client should not see ENVELOPE supported versions as its a broker-internal API.
val usableVersionInfo = if (apiKey == ApiKeys.ENVELOPE) "UNSUPPORTED" else
s"$versionRangeStr [usable: $usableVersion]"
val terminator = if (apiKey == enabledApis.last) "" else ","
val line = s"\t${apiKey.name}(${apiKey.id}): $usableVersionInfo$terminator"
val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
assertTrue(lineIter.hasNext)
assertEquals(line, lineIter.next())
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.cluster.{Broker, EndPoint}
import kafka.utils.TestUtils
import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, Metadata, MockClient}
import org.apache.kafka.common.Node
import org.apache.kafka.common.feature.Features
import org.apache.kafka.common.feature.Features.emptySupportedFeatures
import org.apache.kafka.common.message.MetadataRequestData
@ -177,10 +178,11 @@ class BrokerToControllerRequestThreadTest {
val metadataCache = mock(classOf[MetadataCache])
val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val port = 1234
val oldController = new Broker(oldControllerId,
Seq(new EndPoint("host1", 1234, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
Seq(new EndPoint("host1", port, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
val newController = new Broker(2,
Seq(new EndPoint("host2", 1234, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
Seq(new EndPoint("host2", port, listenerName, SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
when(metadataCache.getControllerId).thenReturn(Some(oldControllerId), Some(newControllerId))
when(metadataCache.getAliveBrokers).thenReturn(Seq(oldController, newController))
@ -204,18 +206,26 @@ class BrokerToControllerRequestThreadTest {
testRequestThread.enqueue(queueItem)
// initialize to the controller
testRequestThread.doWork()
val oldBrokerNode = new Node(oldControllerId, "host1", port)
assertEquals(Some(oldBrokerNode), testRequestThread.activeControllerAddress())
// send and process the request
mockClient.prepareResponse((body: AbstractRequest) => {
body.isInstanceOf[MetadataRequest] &&
body.asInstanceOf[MetadataRequest].allowAutoTopicCreation()
}, responseWithNotControllerError)
testRequestThread.doWork()
assertEquals(None, testRequestThread.activeControllerAddress())
// reinitialize the controller to a different node
testRequestThread.doWork()
// process the request again
mockClient.prepareResponse(expectedResponse)
testRequestThread.doWork()
val newControllerNode = new Node(newControllerId, "host2", port)
assertEquals(Some(newControllerNode), testRequestThread.activeControllerAddress())
assertTrue(completionHandler.completed.get())
}

View File

@ -180,7 +180,8 @@ class ApiVersionTest {
val response = ApiVersion.apiVersionsResponse(
10,
RecordBatch.MAGIC_VALUE_V1,
Features.emptySupportedFeatures
Features.emptySupportedFeatures,
None
)
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1)
assertEquals(10, response.throttleTimeMs)
@ -198,7 +199,8 @@ class ApiVersionTest {
Utils.mkMap(Utils.mkEntry("feature", new SupportedVersionRange(1.toShort, 4.toShort)))),
Features.finalizedFeatures(
Utils.mkMap(Utils.mkEntry("feature", new FinalizedVersionRange(2.toShort, 3.toShort)))),
10
10,
None
)
verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1)
@ -227,7 +229,8 @@ class ApiVersionTest {
val response = ApiVersion.apiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordBatch.CURRENT_MAGIC_VALUE,
Features.emptySupportedFeatures
Features.emptySupportedFeatures,
None
)
assertEquals(new util.HashSet[ApiKeys](ApiKeys.enabledApis), apiKeysInResponse(response))
assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs)
@ -241,7 +244,8 @@ class ApiVersionTest {
val response = ApiVersion.apiVersionsResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordBatch.CURRENT_MAGIC_VALUE,
Features.emptySupportedFeatures
Features.emptySupportedFeatures,
None
)
// Ensure that APIs needed for the internal metadata quorum (KIP-500)

View File

@ -21,9 +21,9 @@ import java.util
import java.util.Collections
import org.apache.kafka.clients.MockClient.MockMetadataUpdater
import org.apache.kafka.clients.{ApiVersion, MockClient, NodeApiVersions}
import org.apache.kafka.clients.{MockClient, NodeApiVersions}
import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumEpochResponseData, FetchResponseData, VoteResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, ApiVersion, Errors}
import org.apache.kafka.common.requests.{AbstractResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse, EndQuorumEpochRequest, EndQuorumEpochResponse, FetchResponse, VoteRequest, VoteResponse}
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.common.{Node, TopicPartition}

View File

@ -16,7 +16,10 @@
*/
package kafka.server
import java.util.Properties
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
import org.junit.Assert._
@ -25,6 +28,16 @@ import scala.jdk.CollectionConverters._
abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
def controlPlaneListenerName = new ListenerName("CONTROLLER")
// Configure control plane listener to make sure we have separate listeners for testing.
override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.setProperty(KafkaConfig.ControlPlaneListenerNameProp, controlPlaneListenerName.value())
properties.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${controlPlaneListenerName.value()}:$securityProtocol,$securityProtocol:$securityProtocol")
properties.setProperty("listeners", s"$securityProtocol://localhost:0,${controlPlaneListenerName.value()}://localhost:0")
properties.setProperty(KafkaConfig.AdvertisedListenersProp, s"$securityProtocol://localhost:0,${controlPlaneListenerName.value()}://localhost:0")
}
def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = {
val overrideHeader = nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
val socket = connect(anySocketServer)
@ -34,10 +47,13 @@ abstract class AbstractApiVersionsRequestTest extends BaseRequestTest {
} finally socket.close()
}
def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse): Unit = {
val enabledPublicApis = ApiKeys.enabledApis()
def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = interBrokerListenerName): Unit = {
val expectedApis = ApiKeys.enabledApis()
if (listenerName == controlPlaneListenerName) {
expectedApis.add(ApiKeys.ENVELOPE)
}
assertEquals("API keys in ApiVersionsResponse must match API keys supported by broker.",
enabledPublicApis.size(), apiVersionsResponse.data.apiKeys().size())
expectedApis.size(), apiVersionsResponse.data.apiKeys().size())
for (expectedApiVersion: ApiVersionsResponseKey <- ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data.apiKeys().asScala) {
val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey)
assertNotNull(s"API key ${actualApiVersion.apiKey} is supported by broker, but not received in ApiVersionsResponse.", actualApiVersion)

View File

@ -18,6 +18,7 @@
package kafka.server
import org.apache.kafka.common.message.ApiVersionsRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
import org.junit.Assert._
@ -34,6 +35,13 @@ class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest {
validateApiVersionsResponse(apiVersionsResponse)
}
@Test
def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, super.controlPlaneListenerName)
validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName)
}
@Test
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build()
@ -53,6 +61,13 @@ class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest {
validateApiVersionsResponse(apiVersionsResponse)
}
@Test
def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, super.controlPlaneListenerName)
validateApiVersionsResponse(apiVersionsResponse, super.controlPlaneListenerName)
}
@Test
def testApiVersionsRequestValidationV3(): Unit = {
// Invalid request because Name and Version are empty by default
@ -61,8 +76,8 @@ class ApiVersionsRequestTest extends AbstractApiVersionsRequestTest {
assertEquals(Errors.INVALID_REQUEST.code(), apiVersionsResponse.data.errorCode())
}
private def sendApiVersionsRequest(request: ApiVersionsRequest): ApiVersionsResponse = {
connectAndReceive[ApiVersionsResponse](request)
private def sendApiVersionsRequest(request: ApiVersionsRequest,
listenerName: ListenerName = super.listenerName): ApiVersionsResponse = {
connectAndReceive[ApiVersionsResponse](request, listenerName = listenerName)
}
}

View File

@ -19,6 +19,7 @@ package kafka.server
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import kafka.network
import kafka.network.RequestChannel
@ -73,16 +74,57 @@ class ForwardingManagerTest {
})
var response: AbstractResponse = null
forwardingManager.forwardRequest(request, res => response = res)
forwardingManager.forwardRequest(request, result => response = result.orNull)
assertNotNull(response)
assertEquals(Map(Errors.UNKNOWN_SERVER_ERROR -> 1).asJava, response.errorCounts())
}
@Test
def testUnsupportedVersions(): Unit = {
val forwardingManager = new ForwardingManagerImpl(brokerToController)
val requestCorrelationId = 27
val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo")
val configs = List(new AlterConfigsRequest.ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")).asJava
val requestBody = new AlterConfigsRequest.Builder(Map(
configResource -> new AlterConfigsRequest.Config(configs)
).asJava, false).build()
val (requestHeader, requestBuffer) = buildRequest(requestBody, requestCorrelationId)
val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
val responseBody = new AlterConfigsResponse(new AlterConfigsResponseData())
val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody,
requestHeader.apiVersion, requestCorrelationId)
Mockito.when(brokerToController.sendRequest(
any(classOf[EnvelopeRequest.Builder]),
any(classOf[ControllerRequestCompletionHandler])
)).thenAnswer(invocation => {
val completionHandler = invocation.getArgument[RequestCompletionHandler](1)
val response = buildEnvelopeResponse(responseBuffer, 30,
completionHandler, Errors.UNSUPPORTED_VERSION)
response.onComplete()
})
var response: AbstractResponse = null
val connectionClosed = new AtomicBoolean(false)
forwardingManager.forwardRequest(request, res => {
response = res.orNull
connectionClosed.set(true)
})
assertTrue(connectionClosed.get())
assertNull(response)
}
private def buildEnvelopeResponse(
responseBuffer: ByteBuffer,
correlationId: Int,
completionHandler: RequestCompletionHandler
completionHandler: RequestCompletionHandler,
error: Errors = Errors.NONE
): ClientResponse = {
val envelopeRequestHeader = new RequestHeader(
ApiKeys.ENVELOPE,
@ -92,7 +134,7 @@ class ForwardingManagerTest {
)
val envelopeResponse = new EnvelopeResponse(
responseBuffer,
Errors.NONE
error
)
new ClientResponse(

View File

@ -36,6 +36,7 @@ import kafka.network.RequestChannel.{CloseConnectionResponse, SendResponse}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.acl.AclOperation
@ -519,7 +520,7 @@ class KafkaApisTest {
EasyMock.expect(forwardingManager.forwardRequest(
EasyMock.eq(request),
anyObject[AbstractResponse => Unit]()
anyObject[Option[AbstractResponse] => Unit]()
)).once()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller, forwardingManager)
@ -690,6 +691,78 @@ class KafkaApisTest {
}
}
@Test
def testHandleApiVersionsWithControllerApiVersions(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion, clientId, 0)
val permittedVersion: Short = 0
EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn(
Some(NodeApiVersions.create(ApiKeys.ALTER_CONFIGS.id, permittedVersion, permittedVersion)))
val capturedResponse = expectNoThrottling()
val apiVersionsRequest = new ApiVersionsRequest.Builder()
.build(requestHeader.apiVersion)
val request = buildRequest(apiVersionsRequest,
fromPrivilegedListener = true, requestHeader = Option(requestHeader))
EasyMock.replay(replicaManager, clientRequestQuotaManager, forwardingManager,
requestChannel, authorizer, adminManager, controller)
createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handleApiVersionsRequest(request)
val expectedVersions = new ApiVersionsResponseData.ApiVersionsResponseKey()
.setApiKey(ApiKeys.ALTER_CONFIGS.id)
.setMaxVersion(permittedVersion)
.setMinVersion(permittedVersion)
val response = readResponse(apiVersionsRequest, capturedResponse)
.asInstanceOf[ApiVersionsResponse]
assertEquals(Errors.NONE, Errors.forCode(response.data().errorCode()))
val alterConfigVersions = response.data().apiKeys().find(ApiKeys.ALTER_CONFIGS.id)
assertEquals(expectedVersions, alterConfigVersions)
verify(authorizer, adminManager, forwardingManager)
}
@Test
def testGetUnsupportedVersionsWhenControllerApiVersionsNotAvailable(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion, clientId, 0)
EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn(None)
val capturedResponse = expectNoThrottling()
val apiVersionsRequest = new ApiVersionsRequest.Builder()
.build(requestHeader.apiVersion)
val request = buildRequest(apiVersionsRequest,
fromPrivilegedListener = true, requestHeader = Option(requestHeader))
EasyMock.replay(replicaManager, clientRequestQuotaManager, forwardingManager,
requestChannel, authorizer, adminManager, controller)
createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handleApiVersionsRequest(request)
val response = readResponse(apiVersionsRequest, capturedResponse)
.asInstanceOf[ApiVersionsResponse]
assertEquals(Errors.NONE, Errors.forCode(response.data().errorCode()))
val expectedVersions = new ApiVersionsResponseData.ApiVersionsResponseKey()
.setApiKey(ApiKeys.ALTER_CONFIGS.id)
.setMaxVersion(ApiKeys.ALTER_CONFIGS.latestVersion())
.setMinVersion(ApiKeys.ALTER_CONFIGS.oldestVersion())
val alterConfigVersions = response.data().apiKeys().find(ApiKeys.ALTER_CONFIGS.id)
assertEquals(expectedVersions, alterConfigVersions)
verify(authorizer, adminManager, forwardingManager)
}
@Test
def testCreateTopicsWithAuthorizer(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])