KAFKA-10852: AlterIsr should not be throttled (#9747)

Set it as a cluster action and update the handler in KafkaApis. We keep the `throttleTimeMs` field
since we intend to enable throttling in the future (especially relevant when we switch to the
built-in quorum mode).

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Ismael Juma 2020-12-14 22:28:47 -08:00 committed by GitHub
parent 78b74debe4
commit 5e5daf47ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 20 deletions

View File

@ -251,7 +251,7 @@ public enum ApiKeys {
EndQuorumEpochRequestData.SCHEMAS, EndQuorumEpochResponseData.SCHEMAS),
DESCRIBE_QUORUM(55, "DescribeQuorum", true, false,
DescribeQuorumRequestData.SCHEMAS, DescribeQuorumResponseData.SCHEMAS),
ALTER_ISR(56, "AlterIsr", AlterIsrRequestData.SCHEMAS, AlterIsrResponseData.SCHEMAS),
ALTER_ISR(56, "AlterIsr", true, AlterIsrRequestData.SCHEMAS, AlterIsrResponseData.SCHEMAS),
UPDATE_FEATURES(57, "UpdateFeatures",
UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS, true),
ENVELOPE(58, "Envelope", true, false, EnvelopeRequestData.SCHEMAS, EnvelopeResponseData.SCHEMAS);

View File

@ -20,11 +20,12 @@ import org.apache.kafka.common.protocol.types.BoundField;
import org.apache.kafka.common.protocol.types.Schema;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.EnumSet;
import java.util.Set;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ApiKeysTest {
@ -43,6 +44,11 @@ public class ApiKeysTest {
ApiKeys.PRODUCE.requestSchema((short) ApiKeys.PRODUCE.requestSchemas.length);
}
@Test
public void testAlterIsrIsClusterAction() {
assertTrue(ApiKeys.ALTER_ISR.clusterAction);
}
/**
* All valid client responses which may be throttled should have a field named
* 'throttle_time_ms' to return the throttle time to the client. Exclusions are
@ -55,11 +61,14 @@ public class ApiKeysTest {
*/
@Test
public void testResponseThrottleTime() {
List<ApiKeys> authenticationKeys = Arrays.asList(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
Set<ApiKeys> authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
// Newer protocol apis include throttle time ms even for cluster actions
Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_ISR);
for (ApiKeys apiKey: ApiKeys.values()) {
Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion());
BoundField throttleTimeField = responseSchema.get(CommonFields.THROTTLE_TIME_MS.name);
if (apiKey.clusterAction || authenticationKeys.contains(apiKey))
if ((apiKey.clusterAction && !clusterActionsWithThrottleTimeMs.contains(apiKey))
|| authenticationKeys.contains(apiKey))
assertNull("Unexpected throttle time field: " + apiKey, throttleTimeField);
else
assertNotNull("Throttle time field missing: " + apiKey, throttleTimeField);

View File

@ -3238,20 +3238,15 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
val alterIsrRequest = request.body[AlterIsrRequest]
authorizeClusterOperation(request, CLUSTER_ACTION)
if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
alterIsrRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else if (!controller.isActive) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
alterIsrRequest.getErrorResponse(requestThrottleMs, Errors.NOT_CONTROLLER.exception()))
} else {
controller.alterIsrs(alterIsrRequest.data,
alterIsrResp => sendResponseMaybeThrottle(request, requestThrottleMs =>
new AlterIsrResponse(alterIsrResp.setThrottleTimeMs(requestThrottleMs))
)
if (!controller.isActive)
sendResponseExemptThrottle(request, alterIsrRequest.getErrorResponse(
AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.NOT_CONTROLLER.exception))
else
controller.alterIsrs(alterIsrRequest.data, alterIsrResp =>
sendResponseExemptThrottle(request, new AlterIsrResponse(alterIsrResp))
)
}
}
def handleUpdateFeatures(request: RequestChannel.Request): Unit = {

View File

@ -718,7 +718,7 @@ class RequestQuotaTest extends BaseRequestTest {
val exemptTarget = exemptRequestMetricValue + 0.02
val clientId = apiKey.toString
val client = Client(clientId, apiKey)
val updated = client.runUntil(response => exemptRequestMetricValue > exemptTarget)
val updated = client.runUntil(_ => exemptRequestMetricValue > exemptTarget)
assertTrue(s"Exempt-request-time metric not updated: $client", updated)
assertTrue(s"Client should not have been throttled: $client", throttleTimeMetricValue(clientId).isNaN)
@ -727,13 +727,13 @@ class RequestQuotaTest extends BaseRequestTest {
private def checkUnauthorizedRequestThrottle(apiKey: ApiKeys): Unit = {
val clientId = "unauthorized-" + apiKey.toString
val client = Client(clientId, apiKey)
val throttled = client.runUntil(response => throttleTimeMetricValue(clientId) > 0.0)
val throttled = client.runUntil(_ => throttleTimeMetricValue(clientId) > 0.0)
assertTrue(s"Unauthorized client should have been throttled: $client", throttled)
}
}
object RequestQuotaTest {
val ClusterActions = ApiKeys.enabledApis.asScala.toSet.filter(apiKey => apiKey.clusterAction)
val ClusterActions = ApiKeys.enabledApis.asScala.filter(_.clusterAction).toSet
val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
val ClientActions = ApiKeys.enabledApis.asScala.toSet -- ClusterActions -- SaslActions