KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller (#10194)

This patch updates request `listeners` tags to be in line with what the KIP-500 broker/controller support today. We will re-enable these APIs as needed once we have added the support.

I have also updated `ControllerApis` to use `ApiVersionManager` and simplified the envelope handling logic.

Reviewers: Ron Dagostino <rdagostino@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Jason Gustafson 2021-02-25 19:38:21 -08:00 committed by GitHub
parent 1657deec37
commit 74dfe80bb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 68 additions and 134 deletions

View File

@ -16,7 +16,7 @@
{
"apiKey": 25,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker"],
"name": "AddOffsetsToTxnRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 24,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker"],
"name": "AddPartitionsToTxnRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 33,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "AlterConfigsRequest",
// Version 1 is the same as version 0.
// Version 2 enables flexible versions.

View File

@ -16,7 +16,7 @@
{
"apiKey": 45,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker"],
"name": "AlterPartitionReassignmentsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 51,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "AlterUserScramCredentialsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 30,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "CreateAclsRequest",
// Version 1 adds resource pattern type.
// Version 2 enables flexible versions.

View File

@ -16,7 +16,7 @@
{
"apiKey": 38,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "CreateDelegationTokenRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 37,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "CreatePartitionsRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 31,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "DeleteAclsRequest",
// Version 1 adds the pattern type.
// Version 2 enables flexible versions.

View File

@ -16,7 +16,7 @@
{
"apiKey": 29,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "DescribeAclsRequest",
// Version 1 adds resource pattern type.
// Version 2 enables flexible versions.

View File

@ -16,7 +16,7 @@
{
"apiKey": 48,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker"],
"name": "DescribeClientQuotasRequest",
// Version 1 enables flexible versions.
"validVersions": "0-1",

View File

@ -16,7 +16,7 @@
{
"apiKey": 32,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker", "broker"],
"name": "DescribeConfigsRequest",
// Version 1 adds IncludeSynonyms.
// Version 2 is the same as version 1.

View File

@ -16,7 +16,7 @@
{
"apiKey": 41,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "DescribeDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support

View File

@ -16,7 +16,7 @@
{
"apiKey": 50,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "DescribeUserScramCredentialsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 43,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "ElectLeadersRequest",
// Version 1 implements multiple leader election types, as described by KIP-460.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 26,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker"],
"name": "EndTxnRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 40,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "ExpireDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support

View File

@ -16,7 +16,7 @@
{
"apiKey": 22,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker"],
"name": "InitProducerIdRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 46,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker"],
"name": "ListPartitionReassignmentsRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 3,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker", "broker", "controller"],
"name": "MetadataRequest",
"validVersions": "0-11",
"flexibleVersions": "9+",

View File

@ -16,7 +16,7 @@
{
"apiKey": 39,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "RenewDelegationTokenRequest",
// Version 1 is the same as version 0.
// Version 2 adds flexible version support

View File

@ -16,7 +16,7 @@
{
"apiKey": 28,
"type": "request",
"listeners": ["zkBroker", "broker"],
"listeners": ["zkBroker"],
"name": "TxnOffsetCommitRequest",
// Version 1 is the same as version 0.
//

View File

@ -16,7 +16,7 @@
{
"apiKey": 57,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"listeners": ["zkBroker"],
"name": "UpdateFeaturesRequest",
"validVersions": "0",
"flexibleVersions": "0+",

View File

@ -24,15 +24,15 @@ import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.Logging
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.common.Node
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DESCRIBE}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.ApiVersionsResponseData.{ApiVersion, SupportedFeatureKey}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
import org.apache.kafka.common.message.{ApiVersionsResponseData, BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, UnregisterBrokerResponseData, VoteResponseData}
import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, UnregisterBrokerResponseData, VoteResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record.BaseRecords
import org.apache.kafka.common.requests._
@ -40,9 +40,8 @@ import org.apache.kafka.common.resource.Resource
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.Node
import org.apache.kafka.controller.Controller
import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, FeatureMap, FeatureMapAndEpoch, VersionRange}
import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, VersionRange}
import org.apache.kafka.server.authorizer.Authorizer
import scala.collection.mutable
@ -60,77 +59,14 @@ class ControllerApis(val requestChannel: RequestChannel,
val raftManager: RaftManager[ApiMessageAndVersion],
val config: KafkaConfig,
val metaProperties: MetaProperties,
val controllerNodes: Seq[Node]) extends ApiRequestHandler with Logging {
val controllerNodes: Seq[Node],
val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging {
val authHelper = new AuthHelper(authorizer)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
var supportedApiKeys = Set(
ApiKeys.FETCH,
ApiKeys.METADATA,
//ApiKeys.SASL_HANDSHAKE
ApiKeys.API_VERSIONS,
ApiKeys.CREATE_TOPICS,
//ApiKeys.DELETE_TOPICS,
//ApiKeys.DESCRIBE_ACLS,
//ApiKeys.CREATE_ACLS,
//ApiKeys.DELETE_ACLS,
//ApiKeys.DESCRIBE_CONFIGS,
//ApiKeys.ALTER_CONFIGS,
//ApiKeys.SASL_AUTHENTICATE,
//ApiKeys.CREATE_PARTITIONS,
//ApiKeys.CREATE_DELEGATION_TOKEN
//ApiKeys.RENEW_DELEGATION_TOKEN
//ApiKeys.EXPIRE_DELEGATION_TOKEN
//ApiKeys.DESCRIBE_DELEGATION_TOKEN
//ApiKeys.ELECT_LEADERS
ApiKeys.INCREMENTAL_ALTER_CONFIGS,
//ApiKeys.ALTER_PARTITION_REASSIGNMENTS
//ApiKeys.LIST_PARTITION_REASSIGNMENTS
ApiKeys.ALTER_CLIENT_QUOTAS,
//ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS
//ApiKeys.ALTER_USER_SCRAM_CREDENTIALS
//ApiKeys.UPDATE_FEATURES
ApiKeys.ENVELOPE,
ApiKeys.VOTE,
ApiKeys.BEGIN_QUORUM_EPOCH,
ApiKeys.END_QUORUM_EPOCH,
ApiKeys.DESCRIBE_QUORUM,
ApiKeys.ALTER_ISR,
ApiKeys.BROKER_REGISTRATION,
ApiKeys.BROKER_HEARTBEAT,
ApiKeys.UNREGISTER_BROKER,
)
private def maybeHandleInvalidEnvelope(
envelope: RequestChannel.Request,
forwardedApiKey: ApiKeys
): Boolean = {
def sendEnvelopeError(error: Errors): Unit = {
requestHelper.sendErrorResponseMaybeThrottle(envelope, error.exception)
}
if (!authHelper.authorize(envelope.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
// Forwarding request must have CLUSTER_ACTION authorization to reduce the risk of impersonation.
sendEnvelopeError(Errors.CLUSTER_AUTHORIZATION_FAILED)
true
} else if (!forwardedApiKey.forwardable) {
sendEnvelopeError(Errors.INVALID_REQUEST)
true
} else {
false
}
}
override def handle(request: RequestChannel.Request): Unit = {
try {
val handled = request.envelope.exists(envelope => {
maybeHandleInvalidEnvelope(envelope, request.header.apiKey)
})
if (handled)
return
request.header.apiKey match {
case ApiKeys.FETCH => handleFetch(request)
case ApiKeys.METADATA => handleMetadataRequest(request)
@ -146,7 +82,9 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
case ApiKeys.ENVELOPE => EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
case ApiKeys.ENVELOPE => handleEnvelopeRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey()}")
}
} catch {
@ -155,6 +93,27 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
def handleEnvelopeRequest(request: RequestChannel.Request): Unit = {
if (!authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
requestHelper.sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException(
s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope"))
} else {
EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
}
}
def handleSaslHandshakeRequest(request: RequestChannel.Request): Unit = {
val responseData = new SaslHandshakeResponseData().setErrorCode(Errors.ILLEGAL_SASL_STATE.code)
requestHelper.sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(responseData))
}
def handleSaslAuthenticateRequest(request: RequestChannel.Request): Unit = {
val responseData = new SaslAuthenticateResponseData()
.setErrorCode(Errors.ILLEGAL_SASL_STATE.code)
.setErrorMessage("SaslAuthenticate request received after successful authentication")
requestHelper.sendResponseMaybeThrottle(request, _ => new SaslAuthenticateResponse(responseData))
}
private def handleFetch(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new FetchResponse[BaseRecords](response.asInstanceOf[FetchResponseData]))
@ -251,45 +210,17 @@ class ControllerApis(val requestChannel: RequestChannel,
// If this is considered to leak information about the broker version a workaround is to use SSL
// with client authentication which is performed at an earlier stage of the connection where the
// ApiVersionRequest is not available.
def createResponseCallback(features: FeatureMapAndEpoch,
requestThrottleMs: Int): ApiVersionsResponse = {
def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
val apiVersionRequest = request.body[ApiVersionsRequest]
if (apiVersionRequest.hasUnsupportedRequestVersion)
if (apiVersionRequest.hasUnsupportedRequestVersion) {
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
else if (!apiVersionRequest.isValid)
} else if (!apiVersionRequest.isValid) {
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
else {
val data = new ApiVersionsResponseData().
setErrorCode(0.toShort).
setThrottleTimeMs(requestThrottleMs).
setFinalizedFeaturesEpoch(features.epoch())
supportedFeatures.foreach {
case (k, v) => data.supportedFeatures().add(new SupportedFeatureKey().
setName(k).setMaxVersion(v.max()).setMinVersion(v.min()))
}
// features.finalizedFeatures().asScala.foreach {
// case (k, v) => data.finalizedFeatures().add(new FinalizedFeatureKey().
// setName(k).setMaxVersionLevel(v.max()).setMinVersionLevel(v.min()))
// }
ApiKeys.values().foreach {
key =>
if (supportedApiKeys.contains(key)) {
data.apiKeys().add(new ApiVersion().
setApiKey(key.id).
setMaxVersion(key.latestVersion()).
setMinVersion(key.oldestVersion()))
}
}
new ApiVersionsResponse(data)
} else {
apiVersionManager.apiVersionResponse(requestThrottleMs)
}
}
// FutureConverters.toScala(controller.finalizedFeatures()).onComplete {
// case Success(features) =>
requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs => createResponseCallback(new FeatureMapAndEpoch(
new FeatureMap(new util.HashMap()), 0), requestThrottleMs))
// case Failure(e) => requestHelper.handleError(request, e)
// }
requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
}
private def handleVote(request: RequestChannel.Request): Unit = {

View File

@ -168,7 +168,8 @@ class ControllerServer(
raftManager,
config,
metaProperties,
controllerNodes.toSeq)
controllerNodes.toSeq,
apiVersionManager)
controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel,
controllerApis,

View File

@ -23,10 +23,11 @@ import java.util.Properties
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager}
import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager, SimpleApiVersionManager}
import kafka.utils.MockTime
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.BrokerRegistrationRequestData
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.Errors
@ -40,6 +41,7 @@ import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import scala.jdk.CollectionConverters._
class ControllerApisTest {
@ -81,7 +83,8 @@ class ControllerApisTest {
raftManager,
new KafkaConfig(props),
MetaProperties(Uuid.fromString("JgxuGe9URy-E-ceaL04lEw"), nodeId = nodeId),
Seq.empty
Seq.empty,
new SimpleApiVersionManager(ListenerType.CONTROLLER)
)
}

View File

@ -269,7 +269,7 @@ class GroupModeTransactionsTest(Test):
@cluster(num_nodes=10)
@matrix(failure_mode=["hard_bounce", "clean_bounce"],
bounce_target=["brokers", "clients"], metadata_quorum=quorum.all_non_upgrade)
bounce_target=["brokers", "clients"])
def test_transactions(self, failure_mode, bounce_target, metadata_quorum=quorum.zk):
security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol

View File

@ -244,8 +244,7 @@ class TransactionsTest(Test):
@matrix(failure_mode=["hard_bounce", "clean_bounce"],
bounce_target=["brokers", "clients"],
check_order=[True, False],
use_group_metadata=[True, False],
metadata_quorum=quorum.all_non_upgrade)
use_group_metadata=[True, False])
def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk):
security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol