KAFKA-18399 Remove ZooKeeper from KafkaApis (9/N): ALTER_CLIENT_QUOTAS and ALLOCATE_PRODUCER_IDS (#18465)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
mingdaoy 2025-01-15 05:06:16 +08:00 committed by GitHub
parent 730272e396
commit 9f955973fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 1 additions and 99 deletions

View File

@ -222,7 +222,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.UNREGISTER_BROKER => forwardToController(request) case ApiKeys.UNREGISTER_BROKER => forwardToController(request)
case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.DESCRIBE_QUORUM => forwardToController(request) case ApiKeys.DESCRIBE_QUORUM => forwardToController(request)
case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError) case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError) case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
@ -2507,37 +2506,6 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
} }
def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]
if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
val result = zkSupport.adminManager.alterClientQuotas(alterClientQuotasRequest.entries.asScala,
alterClientQuotasRequest.validateOnly)
val entriesData = result.iterator.map { case (quotaEntity, apiError) =>
val entityData = quotaEntity.entries.asScala.iterator.map { case (key, value) =>
new AlterClientQuotasResponseData.EntityData()
.setEntityType(key)
.setEntityName(value)
}.toBuffer
new AlterClientQuotasResponseData.EntryData()
.setErrorCode(apiError.error.code)
.setErrorMessage(apiError.message)
.setEntity(entityData.asJava)
}.toBuffer
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new AlterClientQuotasResponse(new AlterClientQuotasResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setEntries(entriesData.asJava)))
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
alterClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
}
}
def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = { def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest] val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest]
@ -2695,22 +2663,6 @@ class KafkaApis(val requestChannel: RequestChannel,
new ListTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs))) new ListTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs)))
} }
def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
val allocateProducerIdsRequest = request.body[AllocateProducerIdsRequest]
if (!zkSupport.controller.isActive)
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
allocateProducerIdsRequest.getErrorResponse(throttleTimeMs, Errors.NOT_CONTROLLER.exception))
else
zkSupport.controller.allocateProducerIds(allocateProducerIdsRequest.data, producerIdsResponse =>
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
new AllocateProducerIdsResponse(producerIdsResponse.setThrottleTimeMs(throttleTimeMs)))
)
}
private def groupVersion(): GroupVersion = { private def groupVersion(): GroupVersion = {
GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME, 0.toShort)) GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME, 0.toShort))
} }

View File

@ -35,7 +35,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER} import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, UnsupportedVersionException} import org.apache.kafka.common.errors.{ClusterAuthorizationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic} import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
@ -61,7 +61,6 @@ import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ListenerName} import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil} import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
@ -730,48 +729,6 @@ class KafkaApisTest extends Logging {
assertEquals(expectedResults, responseMap) assertEquals(expectedResults, responseMap)
} }
@Test
def testAlterClientQuotasWithAuthorizer(): Unit = {
val authorizer: Authorizer = mock(classOf[Authorizer])
authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion, clientId, 0)
val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
.build(requestHeader.apiVersion)
val request = buildRequest(alterClientQuotasRequest,
fromPrivilegedListener = true, requestHeader = Option(requestHeader))
when(controller.isActive).thenReturn(true)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
anyLong)).thenReturn(0)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handleAlterClientQuotasRequest(request)
val capturedResponse = verifyNoThrottling[AlterClientQuotasResponse](request)
verifyAlterClientQuotaResult(capturedResponse, Map(quotaEntity -> Errors.CLUSTER_AUTHORIZATION_FAILED))
verify(authorizer).authorize(any(), any())
verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), anyLong)
}
private def verifyAlterClientQuotaResult(response: AlterClientQuotasResponse,
expected: Map[ClientQuotaEntity, Errors]): Unit = {
val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap
response.complete(futures.asJava)
futures.foreach {
case (entity, future) =>
future.whenComplete((_, thrown) =>
assertEquals(thrown, expected(entity).exception())
).isDone
}
}
@ParameterizedTest @ParameterizedTest
@CsvSource(value = Array("0,1500", "1500,0", "3000,1000")) @CsvSource(value = Array("0,1500", "1500,0", "3000,1000"))
def testKRaftControllerThrottleTimeEnforced( def testKRaftControllerThrottleTimeEnforced(
@ -10027,13 +9984,6 @@ class KafkaApisTest extends Logging {
setResourceType(BROKER_LOGGER.id()))), setResourceType(BROKER_LOGGER.id()))),
response.data()) response.data())
} }
@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
}
@Test @Test
def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = { def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {