diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index c48e6e10036..21e0b0ffe7d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -251,7 +251,7 @@ public enum ApiKeys { EndQuorumEpochRequestData.SCHEMAS, EndQuorumEpochResponseData.SCHEMAS), DESCRIBE_QUORUM(55, "DescribeQuorum", true, false, DescribeQuorumRequestData.SCHEMAS, DescribeQuorumResponseData.SCHEMAS), - ALTER_ISR(56, "AlterIsr", AlterIsrRequestData.SCHEMAS, AlterIsrResponseData.SCHEMAS), + ALTER_ISR(56, "AlterIsr", true, AlterIsrRequestData.SCHEMAS, AlterIsrResponseData.SCHEMAS), UPDATE_FEATURES(57, "UpdateFeatures", UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS, true), ENVELOPE(58, "Envelope", true, false, EnvelopeRequestData.SCHEMAS, EnvelopeResponseData.SCHEMAS); diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java index 78b59a5b542..5634fd202f7 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java @@ -20,11 +20,12 @@ import org.apache.kafka.common.protocol.types.BoundField; import org.apache.kafka.common.protocol.types.Schema; import org.junit.Test; -import java.util.Arrays; -import java.util.List; +import java.util.EnumSet; +import java.util.Set; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ApiKeysTest { @@ -43,6 +44,11 @@ public class ApiKeysTest { ApiKeys.PRODUCE.requestSchema((short) ApiKeys.PRODUCE.requestSchemas.length); } + @Test + public void testAlterIsrIsClusterAction() { + assertTrue(ApiKeys.ALTER_ISR.clusterAction); + } + /** * All valid client responses which may be throttled should have a field named * 'throttle_time_ms' to return the throttle time to the client. Exclusions are @@ -55,11 +61,14 @@ public class ApiKeysTest { */ @Test public void testResponseThrottleTime() { - List authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE); + Set authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE); + // Newer protocol apis include throttle time ms even for cluster actions + Set clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_ISR); for (ApiKeys apiKey: ApiKeys.values()) { Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion()); BoundField throttleTimeField = responseSchema.get(CommonFields.THROTTLE_TIME_MS.name); - if (apiKey.clusterAction || authenticationKeys.contains(apiKey)) + if ((apiKey.clusterAction && !clusterActionsWithThrottleTimeMs.contains(apiKey)) + || authenticationKeys.contains(apiKey)) assertNull("Unexpected throttle time field: " + apiKey, throttleTimeField); else assertNotNull("Throttle time field missing: " + apiKey, throttleTimeField); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4b25af6dba7..2e42368b746 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3238,20 +3238,15 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAlterIsrRequest(request: RequestChannel.Request): Unit = { val alterIsrRequest = request.body[AlterIsrRequest] + authorizeClusterOperation(request, CLUSTER_ACTION) - if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { - sendResponseMaybeThrottle(request, requestThrottleMs => - alterIsrRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) - } else if (!controller.isActive) { - sendResponseMaybeThrottle(request, requestThrottleMs => - alterIsrRequest.getErrorResponse(requestThrottleMs, Errors.NOT_CONTROLLER.exception())) - } else { - controller.alterIsrs(alterIsrRequest.data, - alterIsrResp => sendResponseMaybeThrottle(request, requestThrottleMs => - new AlterIsrResponse(alterIsrResp.setThrottleTimeMs(requestThrottleMs)) - ) + if (!controller.isActive) + sendResponseExemptThrottle(request, alterIsrRequest.getErrorResponse( + AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.NOT_CONTROLLER.exception)) + else + controller.alterIsrs(alterIsrRequest.data, alterIsrResp => + sendResponseExemptThrottle(request, new AlterIsrResponse(alterIsrResp)) ) - } } def handleUpdateFeatures(request: RequestChannel.Request): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index c79e0c0a4c8..bf22a40d7c5 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -718,7 +718,7 @@ class RequestQuotaTest extends BaseRequestTest { val exemptTarget = exemptRequestMetricValue + 0.02 val clientId = apiKey.toString val client = Client(clientId, apiKey) - val updated = client.runUntil(response => exemptRequestMetricValue > exemptTarget) + val updated = client.runUntil(_ => exemptRequestMetricValue > exemptTarget) assertTrue(s"Exempt-request-time metric not updated: $client", updated) assertTrue(s"Client should not have been throttled: $client", throttleTimeMetricValue(clientId).isNaN) @@ -727,13 +727,13 @@ class RequestQuotaTest extends BaseRequestTest { private def checkUnauthorizedRequestThrottle(apiKey: ApiKeys): Unit = { val clientId = "unauthorized-" + apiKey.toString val client = Client(clientId, apiKey) - val throttled = client.runUntil(response => throttleTimeMetricValue(clientId) > 0.0) + val throttled = client.runUntil(_ => throttleTimeMetricValue(clientId) > 0.0) assertTrue(s"Unauthorized client should have been throttled: $client", throttled) } } object RequestQuotaTest { - val ClusterActions = ApiKeys.enabledApis.asScala.toSet.filter(apiKey => apiKey.clusterAction) + val ClusterActions = ApiKeys.enabledApis.asScala.filter(_.clusterAction).toSet val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE) val ClientActions = ApiKeys.enabledApis.asScala.toSet -- ClusterActions -- SaslActions