MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server (#9030)

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, David Jacot <djacot@confluent.io>, Ron Dagostino <rdagostino@confluent.io>
This commit is contained in:
Rajini Sivaram 2020-07-17 08:55:53 +01:00 committed by GitHub
parent 796fae25c3
commit 99c64822a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 221 additions and 98 deletions

View File

@ -20,10 +20,12 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.AlterClientQuotasResponseData; import org.apache.kafka.common.message.AlterClientQuotasResponseData;
import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntityData; import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntityData;
import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntryData; import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntryData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.quota.ClientQuotaEntity;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
@ -50,14 +52,13 @@ public class AlterClientQuotasResponse extends AbstractResponse {
} }
public AlterClientQuotasResponse(Collection<ClientQuotaEntity> entities, int throttleTimeMs, Throwable e) { public AlterClientQuotasResponse(Collection<ClientQuotaEntity> entities, int throttleTimeMs, Throwable e) {
short errorCode = Errors.forException(e).code(); ApiError apiError = ApiError.fromThrowable(e);
String errorMessage = e.getMessage();
List<EntryData> entries = new ArrayList<>(entities.size()); List<EntryData> entries = new ArrayList<>(entities.size());
for (ClientQuotaEntity entity : entities) { for (ClientQuotaEntity entity : entities) {
entries.add(new EntryData() entries.add(new EntryData()
.setErrorCode(errorCode) .setErrorCode(apiError.error().code())
.setErrorMessage(errorMessage) .setErrorMessage(apiError.message())
.setEntity(toEntityData(entity))); .setEntity(toEntityData(entity)));
} }
@ -120,4 +121,8 @@ public class AlterClientQuotasResponse extends AbstractResponse {
} }
return entityData; return entityData;
} }
public static AlterClientQuotasResponse parse(ByteBuffer buffer, short version) {
return new AlterClientQuotasResponse(ApiKeys.ALTER_CLIENT_QUOTAS.parseResponse(version, buffer), version);
}
} }

View File

@ -38,9 +38,10 @@ public class ApiError {
private final String message; private final String message;
public static ApiError fromThrowable(Throwable t) { public static ApiError fromThrowable(Throwable t) {
// Avoid populating the error message if it's a generic one // Avoid populating the error message if it's a generic one. Also don't populate error
// message for UNKNOWN_SERVER_ERROR to ensure we don't leak sensitive information.
Errors error = Errors.forException(t); Errors error = Errors.forException(t);
String message = error.message().equals(t.getMessage()) ? null : t.getMessage(); String message = error == Errors.UNKNOWN_SERVER_ERROR || error.message().equals(t.getMessage()) ? null : t.getMessage();
return new ApiError(error, message); return new ApiError(error, message);
} }

View File

@ -21,10 +21,12 @@ import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData; import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData; import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData; import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.quota.ClientQuotaEntity;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -66,10 +68,11 @@ public class DescribeClientQuotasResponse extends AbstractResponse {
} }
public DescribeClientQuotasResponse(int throttleTimeMs, Throwable e) { public DescribeClientQuotasResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
this.data = new DescribeClientQuotasResponseData() this.data = new DescribeClientQuotasResponseData()
.setThrottleTimeMs(throttleTimeMs) .setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code()) .setErrorCode(apiError.error().code())
.setErrorMessage(e.getMessage()) .setErrorMessage(apiError.message())
.setEntries(null); .setEntries(null);
} }
@ -115,4 +118,8 @@ public class DescribeClientQuotasResponse extends AbstractResponse {
protected Struct toStruct(short version) { protected Struct toStruct(short version) {
return data.toStruct(version); return data.toStruct(version);
} }
public static DescribeClientQuotasResponse parse(ByteBuffer buffer, short version) {
return new DescribeClientQuotasResponse(ApiKeys.DESCRIBE_CLIENT_QUOTAS.parseResponse(version, buffer), version);
}
} }

View File

@ -77,9 +77,10 @@ public class SaslAuthenticateRequest extends AbstractRequest {
@Override @Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
ApiError apiError = ApiError.fromThrowable(e);
SaslAuthenticateResponseData response = new SaslAuthenticateResponseData() SaslAuthenticateResponseData response = new SaslAuthenticateResponseData()
.setErrorCode(ApiError.fromThrowable(e).error().code()) .setErrorCode(apiError.error().code())
.setErrorMessage(e.getMessage()); .setErrorMessage(apiError.message());
return new SaslAuthenticateResponse(response); return new SaslAuthenticateResponse(response);
} }

View File

@ -147,6 +147,9 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.RecordBatch;
@ -197,18 +200,21 @@ import static org.junit.Assert.fail;
public class RequestResponseTest { public class RequestResponseTest {
// Exception includes a message that we verify is not included in error responses
private final UnknownServerException unknownServerException = new UnknownServerException("secret");
@Test @Test
public void testSerialization() throws Exception { public void testSerialization() throws Exception {
checkRequest(createFindCoordinatorRequest(0), true); checkRequest(createFindCoordinatorRequest(0), true);
checkRequest(createFindCoordinatorRequest(1), true); checkRequest(createFindCoordinatorRequest(1), true);
checkErrorResponse(createFindCoordinatorRequest(0), new UnknownServerException(), true); checkErrorResponse(createFindCoordinatorRequest(0), unknownServerException, true);
checkErrorResponse(createFindCoordinatorRequest(1), new UnknownServerException(), true); checkErrorResponse(createFindCoordinatorRequest(1), unknownServerException, true);
checkResponse(createFindCoordinatorResponse(), 0, true); checkResponse(createFindCoordinatorResponse(), 0, true);
checkResponse(createFindCoordinatorResponse(), 1, true); checkResponse(createFindCoordinatorResponse(), 1, true);
checkRequest(createControlledShutdownRequest(), true); checkRequest(createControlledShutdownRequest(), true);
checkResponse(createControlledShutdownResponse(), 1, true); checkResponse(createControlledShutdownResponse(), 1, true);
checkErrorResponse(createControlledShutdownRequest(), new UnknownServerException(), true); checkErrorResponse(createControlledShutdownRequest(), unknownServerException, true);
checkErrorResponse(createControlledShutdownRequest(0), new UnknownServerException(), true); checkErrorResponse(createControlledShutdownRequest(0), unknownServerException, true);
checkRequest(createFetchRequest(4), true); checkRequest(createFetchRequest(4), true);
checkResponse(createFetchResponse(), 4, true); checkResponse(createFetchResponse(), 4, true);
List<TopicPartition> toForgetTopics = new ArrayList<>(); List<TopicPartition> toForgetTopics = new ArrayList<>();
@ -218,53 +224,53 @@ public class RequestResponseTest {
checkRequest(createFetchRequest(7, new FetchMetadata(123, 456), toForgetTopics), true); checkRequest(createFetchRequest(7, new FetchMetadata(123, 456), toForgetTopics), true);
checkResponse(createFetchResponse(123), 7, true); checkResponse(createFetchResponse(123), 7, true);
checkResponse(createFetchResponse(Errors.FETCH_SESSION_ID_NOT_FOUND, 123), 7, true); checkResponse(createFetchResponse(Errors.FETCH_SESSION_ID_NOT_FOUND, 123), 7, true);
checkErrorResponse(createFetchRequest(4), new UnknownServerException(), true); checkErrorResponse(createFetchRequest(4), unknownServerException, true);
checkRequest(createHeartBeatRequest(), true); checkRequest(createHeartBeatRequest(), true);
checkErrorResponse(createHeartBeatRequest(), new UnknownServerException(), true); checkErrorResponse(createHeartBeatRequest(), unknownServerException, true);
checkResponse(createHeartBeatResponse(), 0, true); checkResponse(createHeartBeatResponse(), 0, true);
for (int v = ApiKeys.JOIN_GROUP.oldestVersion(); v <= ApiKeys.JOIN_GROUP.latestVersion(); v++) { for (int v = ApiKeys.JOIN_GROUP.oldestVersion(); v <= ApiKeys.JOIN_GROUP.latestVersion(); v++) {
checkRequest(createJoinGroupRequest(v), true); checkRequest(createJoinGroupRequest(v), true);
checkErrorResponse(createJoinGroupRequest(v), new UnknownServerException(), true); checkErrorResponse(createJoinGroupRequest(v), unknownServerException, true);
checkResponse(createJoinGroupResponse(v), v, true); checkResponse(createJoinGroupResponse(v), v, true);
} }
for (int v = ApiKeys.SYNC_GROUP.oldestVersion(); v <= ApiKeys.SYNC_GROUP.latestVersion(); v++) { for (int v = ApiKeys.SYNC_GROUP.oldestVersion(); v <= ApiKeys.SYNC_GROUP.latestVersion(); v++) {
checkRequest(createSyncGroupRequest(v), true); checkRequest(createSyncGroupRequest(v), true);
checkErrorResponse(createSyncGroupRequest(v), new UnknownServerException(), true); checkErrorResponse(createSyncGroupRequest(v), unknownServerException, true);
checkResponse(createSyncGroupResponse(v), v, true); checkResponse(createSyncGroupResponse(v), v, true);
} }
checkRequest(createLeaveGroupRequest(), true); checkRequest(createLeaveGroupRequest(), true);
checkErrorResponse(createLeaveGroupRequest(), new UnknownServerException(), true); checkErrorResponse(createLeaveGroupRequest(), unknownServerException, true);
checkResponse(createLeaveGroupResponse(), 0, true); checkResponse(createLeaveGroupResponse(), 0, true);
for (short v = ApiKeys.LIST_GROUPS.oldestVersion(); v <= ApiKeys.LIST_GROUPS.latestVersion(); v++) { for (short v = ApiKeys.LIST_GROUPS.oldestVersion(); v <= ApiKeys.LIST_GROUPS.latestVersion(); v++) {
checkRequest(createListGroupsRequest(v), false); checkRequest(createListGroupsRequest(v), false);
checkErrorResponse(createListGroupsRequest(v), new UnknownServerException(), true); checkErrorResponse(createListGroupsRequest(v), unknownServerException, true);
checkResponse(createListGroupsResponse(v), v, true); checkResponse(createListGroupsResponse(v), v, true);
} }
checkRequest(createDescribeGroupRequest(), true); checkRequest(createDescribeGroupRequest(), true);
checkErrorResponse(createDescribeGroupRequest(), new UnknownServerException(), true); checkErrorResponse(createDescribeGroupRequest(), unknownServerException, true);
checkResponse(createDescribeGroupResponse(), 0, true); checkResponse(createDescribeGroupResponse(), 0, true);
checkRequest(createDeleteGroupsRequest(), true); checkRequest(createDeleteGroupsRequest(), true);
checkErrorResponse(createDeleteGroupsRequest(), new UnknownServerException(), true); checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true);
checkResponse(createDeleteGroupsResponse(), 0, true); checkResponse(createDeleteGroupsResponse(), 0, true);
for (int i = 0; i < ApiKeys.LIST_OFFSETS.latestVersion(); i++) { for (int i = 0; i < ApiKeys.LIST_OFFSETS.latestVersion(); i++) {
checkRequest(createListOffsetRequest(i), true); checkRequest(createListOffsetRequest(i), true);
checkErrorResponse(createListOffsetRequest(i), new UnknownServerException(), true); checkErrorResponse(createListOffsetRequest(i), unknownServerException, true);
checkResponse(createListOffsetResponse(i), i, true); checkResponse(createListOffsetResponse(i), i, true);
} }
checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true); checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true);
checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true); checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true);
checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true); checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), unknownServerException, true);
checkResponse(createMetadataResponse(), 2, true); checkResponse(createMetadataResponse(), 2, true);
checkErrorResponse(createMetadataRequest(2, Collections.singletonList("topic1")), new UnknownServerException(), true); checkErrorResponse(createMetadataRequest(2, Collections.singletonList("topic1")), unknownServerException, true);
checkResponse(createMetadataResponse(), 3, true); checkResponse(createMetadataResponse(), 3, true);
checkErrorResponse(createMetadataRequest(3, Collections.singletonList("topic1")), new UnknownServerException(), true); checkErrorResponse(createMetadataRequest(3, Collections.singletonList("topic1")), unknownServerException, true);
checkResponse(createMetadataResponse(), 4, true); checkResponse(createMetadataResponse(), 4, true);
checkErrorResponse(createMetadataRequest(4, Collections.singletonList("topic1")), new UnknownServerException(), true); checkErrorResponse(createMetadataRequest(4, Collections.singletonList("topic1")), unknownServerException, true);
checkRequest(createOffsetFetchRequestForAllPartition("group1", false), true); checkRequest(createOffsetFetchRequestForAllPartition("group1", false), true);
checkRequest(createOffsetFetchRequestForAllPartition("group1", true), true); checkRequest(createOffsetFetchRequestForAllPartition("group1", true), true);
checkErrorResponse(createOffsetFetchRequestForAllPartition("group1", false), new NotCoordinatorException("Not Coordinator"), true); checkErrorResponse(createOffsetFetchRequestForAllPartition("group1", false), new NotCoordinatorException("Not Coordinator"), true);
@ -275,42 +281,42 @@ public class RequestResponseTest {
checkRequest(createOffsetFetchRequest(7, true), true); checkRequest(createOffsetFetchRequest(7, true), true);
checkRequest(createOffsetFetchRequestForAllPartition("group1", false), true); checkRequest(createOffsetFetchRequestForAllPartition("group1", false), true);
checkRequest(createOffsetFetchRequestForAllPartition("group1", true), true); checkRequest(createOffsetFetchRequestForAllPartition("group1", true), true);
checkErrorResponse(createOffsetFetchRequest(0, false), new UnknownServerException(), true); checkErrorResponse(createOffsetFetchRequest(0, false), unknownServerException, true);
checkErrorResponse(createOffsetFetchRequest(1, false), new UnknownServerException(), true); checkErrorResponse(createOffsetFetchRequest(1, false), unknownServerException, true);
checkErrorResponse(createOffsetFetchRequest(2, false), new UnknownServerException(), true); checkErrorResponse(createOffsetFetchRequest(2, false), unknownServerException, true);
checkErrorResponse(createOffsetFetchRequest(7, true), new UnknownServerException(), true); checkErrorResponse(createOffsetFetchRequest(7, true), unknownServerException, true);
checkResponse(createOffsetFetchResponse(), 0, true); checkResponse(createOffsetFetchResponse(), 0, true);
checkRequest(createProduceRequest(2), true); checkRequest(createProduceRequest(2), true);
checkErrorResponse(createProduceRequest(2), new UnknownServerException(), true); checkErrorResponse(createProduceRequest(2), unknownServerException, true);
checkRequest(createProduceRequest(3), true); checkRequest(createProduceRequest(3), true);
checkErrorResponse(createProduceRequest(3), new UnknownServerException(), true); checkErrorResponse(createProduceRequest(3), unknownServerException, true);
checkResponse(createProduceResponse(), 2, true); checkResponse(createProduceResponse(), 2, true);
checkResponse(createProduceResponseWithErrorMessage(), 8, true); checkResponse(createProduceResponseWithErrorMessage(), 8, true);
for (int v = ApiKeys.STOP_REPLICA.oldestVersion(); v <= ApiKeys.STOP_REPLICA.latestVersion(); v++) { for (int v = ApiKeys.STOP_REPLICA.oldestVersion(); v <= ApiKeys.STOP_REPLICA.latestVersion(); v++) {
checkRequest(createStopReplicaRequest(v, true), true); checkRequest(createStopReplicaRequest(v, true), true);
checkRequest(createStopReplicaRequest(v, false), true); checkRequest(createStopReplicaRequest(v, false), true);
checkErrorResponse(createStopReplicaRequest(v, true), new UnknownServerException(), true); checkErrorResponse(createStopReplicaRequest(v, true), unknownServerException, true);
checkErrorResponse(createStopReplicaRequest(v, false), new UnknownServerException(), true); checkErrorResponse(createStopReplicaRequest(v, false), unknownServerException, true);
checkResponse(createStopReplicaResponse(), v, true); checkResponse(createStopReplicaResponse(), v, true);
} }
checkRequest(createLeaderAndIsrRequest(0), true); checkRequest(createLeaderAndIsrRequest(0), true);
checkErrorResponse(createLeaderAndIsrRequest(0), new UnknownServerException(), false); checkErrorResponse(createLeaderAndIsrRequest(0), unknownServerException, false);
checkRequest(createLeaderAndIsrRequest(1), true); checkRequest(createLeaderAndIsrRequest(1), true);
checkErrorResponse(createLeaderAndIsrRequest(1), new UnknownServerException(), false); checkErrorResponse(createLeaderAndIsrRequest(1), unknownServerException, false);
checkRequest(createLeaderAndIsrRequest(2), true); checkRequest(createLeaderAndIsrRequest(2), true);
checkErrorResponse(createLeaderAndIsrRequest(2), new UnknownServerException(), false); checkErrorResponse(createLeaderAndIsrRequest(2), unknownServerException, false);
checkResponse(createLeaderAndIsrResponse(), 0, true); checkResponse(createLeaderAndIsrResponse(), 0, true);
checkRequest(createSaslHandshakeRequest(), true); checkRequest(createSaslHandshakeRequest(), true);
checkErrorResponse(createSaslHandshakeRequest(), new UnknownServerException(), true); checkErrorResponse(createSaslHandshakeRequest(), unknownServerException, true);
checkResponse(createSaslHandshakeResponse(), 0, true); checkResponse(createSaslHandshakeResponse(), 0, true);
checkRequest(createSaslAuthenticateRequest(), true); checkRequest(createSaslAuthenticateRequest(), true);
checkErrorResponse(createSaslAuthenticateRequest(), new UnknownServerException(), true); checkErrorResponse(createSaslAuthenticateRequest(), unknownServerException, true);
checkResponse(createSaslAuthenticateResponse(), 0, true); checkResponse(createSaslAuthenticateResponse(), 0, true);
checkResponse(createSaslAuthenticateResponse(), 1, true); checkResponse(createSaslAuthenticateResponse(), 1, true);
checkRequest(createApiVersionRequest(), true); checkRequest(createApiVersionRequest(), true);
checkErrorResponse(createApiVersionRequest(), new UnknownServerException(), true); checkErrorResponse(createApiVersionRequest(), unknownServerException, true);
checkErrorResponse(createApiVersionRequest(), new UnsupportedVersionException("Not Supported"), true); checkErrorResponse(createApiVersionRequest(), new UnsupportedVersionException("Not Supported"), true);
checkResponse(createApiVersionResponse(), 0, true); checkResponse(createApiVersionResponse(), 0, true);
checkResponse(createApiVersionResponse(), 1, true); checkResponse(createApiVersionResponse(), 1, true);
@ -322,107 +328,107 @@ public class RequestResponseTest {
checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 3, true); checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 3, true);
checkRequest(createCreateTopicRequest(0), true); checkRequest(createCreateTopicRequest(0), true);
checkErrorResponse(createCreateTopicRequest(0), new UnknownServerException(), true); checkErrorResponse(createCreateTopicRequest(0), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 0, true); checkResponse(createCreateTopicResponse(), 0, true);
checkRequest(createCreateTopicRequest(1), true); checkRequest(createCreateTopicRequest(1), true);
checkErrorResponse(createCreateTopicRequest(1), new UnknownServerException(), true); checkErrorResponse(createCreateTopicRequest(1), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 1, true); checkResponse(createCreateTopicResponse(), 1, true);
checkRequest(createCreateTopicRequest(2), true); checkRequest(createCreateTopicRequest(2), true);
checkErrorResponse(createCreateTopicRequest(2), new UnknownServerException(), true); checkErrorResponse(createCreateTopicRequest(2), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 2, true); checkResponse(createCreateTopicResponse(), 2, true);
checkRequest(createCreateTopicRequest(3), true); checkRequest(createCreateTopicRequest(3), true);
checkErrorResponse(createCreateTopicRequest(3), new UnknownServerException(), true); checkErrorResponse(createCreateTopicRequest(3), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 3, true); checkResponse(createCreateTopicResponse(), 3, true);
checkRequest(createCreateTopicRequest(4), true); checkRequest(createCreateTopicRequest(4), true);
checkErrorResponse(createCreateTopicRequest(4), new UnknownServerException(), true); checkErrorResponse(createCreateTopicRequest(4), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 4, true); checkResponse(createCreateTopicResponse(), 4, true);
checkRequest(createCreateTopicRequest(5), true); checkRequest(createCreateTopicRequest(5), true);
checkErrorResponse(createCreateTopicRequest(5), new UnknownServerException(), true); checkErrorResponse(createCreateTopicRequest(5), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 5, true); checkResponse(createCreateTopicResponse(), 5, true);
checkRequest(createDeleteTopicsRequest(), true); checkRequest(createDeleteTopicsRequest(), true);
checkErrorResponse(createDeleteTopicsRequest(), new UnknownServerException(), true); checkErrorResponse(createDeleteTopicsRequest(), unknownServerException, true);
checkResponse(createDeleteTopicsResponse(), 0, true); checkResponse(createDeleteTopicsResponse(), 0, true);
checkRequest(createInitPidRequest(), true); checkRequest(createInitPidRequest(), true);
checkErrorResponse(createInitPidRequest(), new UnknownServerException(), true); checkErrorResponse(createInitPidRequest(), unknownServerException, true);
checkResponse(createInitPidResponse(), 0, true); checkResponse(createInitPidResponse(), 0, true);
checkRequest(createAddPartitionsToTxnRequest(), true); checkRequest(createAddPartitionsToTxnRequest(), true);
checkResponse(createAddPartitionsToTxnResponse(), 0, true); checkResponse(createAddPartitionsToTxnResponse(), 0, true);
checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true); checkErrorResponse(createAddPartitionsToTxnRequest(), unknownServerException, true);
checkRequest(createAddOffsetsToTxnRequest(), true); checkRequest(createAddOffsetsToTxnRequest(), true);
checkResponse(createAddOffsetsToTxnResponse(), 0, true); checkResponse(createAddOffsetsToTxnResponse(), 0, true);
checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException(), true); checkErrorResponse(createAddOffsetsToTxnRequest(), unknownServerException, true);
checkRequest(createEndTxnRequest(), true); checkRequest(createEndTxnRequest(), true);
checkResponse(createEndTxnResponse(), 0, true); checkResponse(createEndTxnResponse(), 0, true);
checkErrorResponse(createEndTxnRequest(), new UnknownServerException(), true); checkErrorResponse(createEndTxnRequest(), unknownServerException, true);
checkRequest(createWriteTxnMarkersRequest(), true); checkRequest(createWriteTxnMarkersRequest(), true);
checkResponse(createWriteTxnMarkersResponse(), 0, true); checkResponse(createWriteTxnMarkersResponse(), 0, true);
checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException(), true); checkErrorResponse(createWriteTxnMarkersRequest(), unknownServerException, true);
checkOlderFetchVersions(); checkOlderFetchVersions();
checkResponse(createMetadataResponse(), 0, true); checkResponse(createMetadataResponse(), 0, true);
checkResponse(createMetadataResponse(), 1, true); checkResponse(createMetadataResponse(), 1, true);
checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true); checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), unknownServerException, true);
checkRequest(createOffsetCommitRequest(0), true); checkRequest(createOffsetCommitRequest(0), true);
checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException(), true); checkErrorResponse(createOffsetCommitRequest(0), unknownServerException, true);
checkRequest(createOffsetCommitRequest(1), true); checkRequest(createOffsetCommitRequest(1), true);
checkErrorResponse(createOffsetCommitRequest(1), new UnknownServerException(), true); checkErrorResponse(createOffsetCommitRequest(1), unknownServerException, true);
checkRequest(createOffsetCommitRequest(2), true); checkRequest(createOffsetCommitRequest(2), true);
checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException(), true); checkErrorResponse(createOffsetCommitRequest(2), unknownServerException, true);
checkRequest(createOffsetCommitRequest(3), true); checkRequest(createOffsetCommitRequest(3), true);
checkErrorResponse(createOffsetCommitRequest(3), new UnknownServerException(), true); checkErrorResponse(createOffsetCommitRequest(3), unknownServerException, true);
checkRequest(createOffsetCommitRequest(4), true); checkRequest(createOffsetCommitRequest(4), true);
checkErrorResponse(createOffsetCommitRequest(4), new UnknownServerException(), true); checkErrorResponse(createOffsetCommitRequest(4), unknownServerException, true);
checkResponse(createOffsetCommitResponse(), 4, true); checkResponse(createOffsetCommitResponse(), 4, true);
checkRequest(createOffsetCommitRequest(5), true); checkRequest(createOffsetCommitRequest(5), true);
checkErrorResponse(createOffsetCommitRequest(5), new UnknownServerException(), true); checkErrorResponse(createOffsetCommitRequest(5), unknownServerException, true);
checkResponse(createOffsetCommitResponse(), 5, true); checkResponse(createOffsetCommitResponse(), 5, true);
checkRequest(createJoinGroupRequest(0), true); checkRequest(createJoinGroupRequest(0), true);
checkRequest(createUpdateMetadataRequest(0, null), false); checkRequest(createUpdateMetadataRequest(0, null), false);
checkErrorResponse(createUpdateMetadataRequest(0, null), new UnknownServerException(), true); checkErrorResponse(createUpdateMetadataRequest(0, null), unknownServerException, true);
checkRequest(createUpdateMetadataRequest(1, null), false); checkRequest(createUpdateMetadataRequest(1, null), false);
checkRequest(createUpdateMetadataRequest(1, "rack1"), false); checkRequest(createUpdateMetadataRequest(1, "rack1"), false);
checkErrorResponse(createUpdateMetadataRequest(1, null), new UnknownServerException(), true); checkErrorResponse(createUpdateMetadataRequest(1, null), unknownServerException, true);
checkRequest(createUpdateMetadataRequest(2, "rack1"), false); checkRequest(createUpdateMetadataRequest(2, "rack1"), false);
checkRequest(createUpdateMetadataRequest(2, null), false); checkRequest(createUpdateMetadataRequest(2, null), false);
checkErrorResponse(createUpdateMetadataRequest(2, "rack1"), new UnknownServerException(), true); checkErrorResponse(createUpdateMetadataRequest(2, "rack1"), unknownServerException, true);
checkRequest(createUpdateMetadataRequest(3, "rack1"), false); checkRequest(createUpdateMetadataRequest(3, "rack1"), false);
checkRequest(createUpdateMetadataRequest(3, null), false); checkRequest(createUpdateMetadataRequest(3, null), false);
checkErrorResponse(createUpdateMetadataRequest(3, "rack1"), new UnknownServerException(), true); checkErrorResponse(createUpdateMetadataRequest(3, "rack1"), unknownServerException, true);
checkRequest(createUpdateMetadataRequest(4, "rack1"), false); checkRequest(createUpdateMetadataRequest(4, "rack1"), false);
checkRequest(createUpdateMetadataRequest(4, null), false); checkRequest(createUpdateMetadataRequest(4, null), false);
checkErrorResponse(createUpdateMetadataRequest(4, "rack1"), new UnknownServerException(), true); checkErrorResponse(createUpdateMetadataRequest(4, "rack1"), unknownServerException, true);
checkRequest(createUpdateMetadataRequest(5, "rack1"), false); checkRequest(createUpdateMetadataRequest(5, "rack1"), false);
checkRequest(createUpdateMetadataRequest(5, null), false); checkRequest(createUpdateMetadataRequest(5, null), false);
checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), new UnknownServerException(), true); checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), unknownServerException, true);
checkResponse(createUpdateMetadataResponse(), 0, true); checkResponse(createUpdateMetadataResponse(), 0, true);
checkRequest(createListOffsetRequest(0), true); checkRequest(createListOffsetRequest(0), true);
checkErrorResponse(createListOffsetRequest(0), new UnknownServerException(), true); checkErrorResponse(createListOffsetRequest(0), unknownServerException, true);
checkResponse(createListOffsetResponse(0), 0, true); checkResponse(createListOffsetResponse(0), 0, true);
checkRequest(createLeaderEpochRequestForReplica(0, 1), true); checkRequest(createLeaderEpochRequestForReplica(0, 1), true);
checkRequest(createLeaderEpochRequestForConsumer(), true); checkRequest(createLeaderEpochRequestForConsumer(), true);
checkResponse(createLeaderEpochResponse(), 0, true); checkResponse(createLeaderEpochResponse(), 0, true);
checkErrorResponse(createLeaderEpochRequestForConsumer(), new UnknownServerException(), true); checkErrorResponse(createLeaderEpochRequestForConsumer(), unknownServerException, true);
checkRequest(createAddPartitionsToTxnRequest(), true); checkRequest(createAddPartitionsToTxnRequest(), true);
checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true); checkErrorResponse(createAddPartitionsToTxnRequest(), unknownServerException, true);
checkResponse(createAddPartitionsToTxnResponse(), 0, true); checkResponse(createAddPartitionsToTxnResponse(), 0, true);
checkRequest(createAddOffsetsToTxnRequest(), true); checkRequest(createAddOffsetsToTxnRequest(), true);
checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException(), true); checkErrorResponse(createAddOffsetsToTxnRequest(), unknownServerException, true);
checkResponse(createAddOffsetsToTxnResponse(), 0, true); checkResponse(createAddOffsetsToTxnResponse(), 0, true);
checkRequest(createEndTxnRequest(), true); checkRequest(createEndTxnRequest(), true);
checkErrorResponse(createEndTxnRequest(), new UnknownServerException(), true); checkErrorResponse(createEndTxnRequest(), unknownServerException, true);
checkResponse(createEndTxnResponse(), 0, true); checkResponse(createEndTxnResponse(), 0, true);
checkRequest(createWriteTxnMarkersRequest(), true); checkRequest(createWriteTxnMarkersRequest(), true);
checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException(), true); checkErrorResponse(createWriteTxnMarkersRequest(), unknownServerException, true);
checkResponse(createWriteTxnMarkersResponse(), 0, true); checkResponse(createWriteTxnMarkersResponse(), 0, true);
checkRequest(createTxnOffsetCommitRequest(0), true); checkRequest(createTxnOffsetCommitRequest(0), true);
checkRequest(createTxnOffsetCommitRequest(3), true); checkRequest(createTxnOffsetCommitRequest(3), true);
checkRequest(createTxnOffsetCommitRequestWithAutoDowngrade(2), true); checkRequest(createTxnOffsetCommitRequestWithAutoDowngrade(2), true);
checkErrorResponse(createTxnOffsetCommitRequest(0), new UnknownServerException(), true); checkErrorResponse(createTxnOffsetCommitRequest(0), unknownServerException, true);
checkErrorResponse(createTxnOffsetCommitRequest(3), new UnknownServerException(), true); checkErrorResponse(createTxnOffsetCommitRequest(3), unknownServerException, true);
checkErrorResponse(createTxnOffsetCommitRequestWithAutoDowngrade(2), new UnknownServerException(), true); checkErrorResponse(createTxnOffsetCommitRequestWithAutoDowngrade(2), unknownServerException, true);
checkResponse(createTxnOffsetCommitResponse(), 0, true); checkResponse(createTxnOffsetCommitResponse(), 0, true);
checkRequest(createDescribeAclsRequest(), true); checkRequest(createDescribeAclsRequest(), true);
checkErrorResponse(createDescribeAclsRequest(), new SecurityDisabledException("Security is not enabled."), true); checkErrorResponse(createDescribeAclsRequest(), new SecurityDisabledException("Security is not enabled."), true);
@ -434,18 +440,18 @@ public class RequestResponseTest {
checkErrorResponse(createDeleteAclsRequest(), new SecurityDisabledException("Security is not enabled."), true); checkErrorResponse(createDeleteAclsRequest(), new SecurityDisabledException("Security is not enabled."), true);
checkResponse(createDeleteAclsResponse(), ApiKeys.DELETE_ACLS.latestVersion(), true); checkResponse(createDeleteAclsResponse(), ApiKeys.DELETE_ACLS.latestVersion(), true);
checkRequest(createAlterConfigsRequest(), false); checkRequest(createAlterConfigsRequest(), false);
checkErrorResponse(createAlterConfigsRequest(), new UnknownServerException(), true); checkErrorResponse(createAlterConfigsRequest(), unknownServerException, true);
checkResponse(createAlterConfigsResponse(), 0, false); checkResponse(createAlterConfigsResponse(), 0, false);
checkRequest(createDescribeConfigsRequest(0), true); checkRequest(createDescribeConfigsRequest(0), true);
checkRequest(createDescribeConfigsRequestWithConfigEntries(0), false); checkRequest(createDescribeConfigsRequestWithConfigEntries(0), false);
checkErrorResponse(createDescribeConfigsRequest(0), new UnknownServerException(), true); checkErrorResponse(createDescribeConfigsRequest(0), unknownServerException, true);
checkResponse(createDescribeConfigsResponse((short) 0), 0, false); checkResponse(createDescribeConfigsResponse((short) 0), 0, false);
checkRequest(createDescribeConfigsRequest(1), true); checkRequest(createDescribeConfigsRequest(1), true);
checkRequest(createDescribeConfigsRequestWithConfigEntries(1), false); checkRequest(createDescribeConfigsRequestWithConfigEntries(1), false);
checkRequest(createDescribeConfigsRequestWithDocumentation(1), false); checkRequest(createDescribeConfigsRequestWithDocumentation(1), false);
checkRequest(createDescribeConfigsRequestWithDocumentation(2), false); checkRequest(createDescribeConfigsRequestWithDocumentation(2), false);
checkRequest(createDescribeConfigsRequestWithDocumentation(3), false); checkRequest(createDescribeConfigsRequestWithDocumentation(3), false);
checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException(), true); checkErrorResponse(createDescribeConfigsRequest(1), unknownServerException, true);
checkResponse(createDescribeConfigsResponse((short) 1), 1, false); checkResponse(createDescribeConfigsResponse((short) 1), 1, false);
checkDescribeConfigsResponseVersions(); checkDescribeConfigsResponseVersions();
checkRequest(createCreatePartitionsRequest(), true); checkRequest(createCreatePartitionsRequest(), true);
@ -453,36 +459,43 @@ public class RequestResponseTest {
checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException(), true); checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException(), true);
checkResponse(createCreatePartitionsResponse(), 0, true); checkResponse(createCreatePartitionsResponse(), 0, true);
checkRequest(createCreateTokenRequest(), true); checkRequest(createCreateTokenRequest(), true);
checkErrorResponse(createCreateTokenRequest(), new UnknownServerException(), true); checkErrorResponse(createCreateTokenRequest(), unknownServerException, true);
checkResponse(createCreateTokenResponse(), 0, true); checkResponse(createCreateTokenResponse(), 0, true);
checkRequest(createDescribeTokenRequest(), true); checkRequest(createDescribeTokenRequest(), true);
checkErrorResponse(createDescribeTokenRequest(), new UnknownServerException(), true); checkErrorResponse(createDescribeTokenRequest(), unknownServerException, true);
checkResponse(createDescribeTokenResponse(), 0, true); checkResponse(createDescribeTokenResponse(), 0, true);
checkRequest(createExpireTokenRequest(), true); checkRequest(createExpireTokenRequest(), true);
checkErrorResponse(createExpireTokenRequest(), new UnknownServerException(), true); checkErrorResponse(createExpireTokenRequest(), unknownServerException, true);
checkResponse(createExpireTokenResponse(), 0, true); checkResponse(createExpireTokenResponse(), 0, true);
checkRequest(createRenewTokenRequest(), true); checkRequest(createRenewTokenRequest(), true);
checkErrorResponse(createRenewTokenRequest(), new UnknownServerException(), true); checkErrorResponse(createRenewTokenRequest(), unknownServerException, true);
checkResponse(createRenewTokenResponse(), 0, true); checkResponse(createRenewTokenResponse(), 0, true);
checkRequest(createElectLeadersRequest(), true); checkRequest(createElectLeadersRequest(), true);
checkRequest(createElectLeadersRequestNullPartitions(), true); checkRequest(createElectLeadersRequestNullPartitions(), true);
checkErrorResponse(createElectLeadersRequest(), new UnknownServerException(), true); checkErrorResponse(createElectLeadersRequest(), unknownServerException, true);
checkResponse(createElectLeadersResponse(), 1, true); checkResponse(createElectLeadersResponse(), 1, true);
checkRequest(createIncrementalAlterConfigsRequest(), true); checkRequest(createIncrementalAlterConfigsRequest(), true);
checkErrorResponse(createIncrementalAlterConfigsRequest(), new UnknownServerException(), true); checkErrorResponse(createIncrementalAlterConfigsRequest(), unknownServerException, true);
checkResponse(createIncrementalAlterConfigsResponse(), 0, true); checkResponse(createIncrementalAlterConfigsResponse(), 0, true);
checkRequest(createAlterPartitionReassignmentsRequest(), true); checkRequest(createAlterPartitionReassignmentsRequest(), true);
checkErrorResponse(createAlterPartitionReassignmentsRequest(), new UnknownServerException(), true); checkErrorResponse(createAlterPartitionReassignmentsRequest(), unknownServerException, true);
checkResponse(createAlterPartitionReassignmentsResponse(), 0, true); checkResponse(createAlterPartitionReassignmentsResponse(), 0, true);
checkRequest(createListPartitionReassignmentsRequest(), true); checkRequest(createListPartitionReassignmentsRequest(), true);
checkErrorResponse(createListPartitionReassignmentsRequest(), new UnknownServerException(), true); checkErrorResponse(createListPartitionReassignmentsRequest(), unknownServerException, true);
checkResponse(createListPartitionReassignmentsResponse(), 0, true); checkResponse(createListPartitionReassignmentsResponse(), 0, true);
checkRequest(createOffsetDeleteRequest(), true); checkRequest(createOffsetDeleteRequest(), true);
checkErrorResponse(createOffsetDeleteRequest(), new UnknownServerException(), true); checkErrorResponse(createOffsetDeleteRequest(), unknownServerException, true);
checkResponse(createOffsetDeleteResponse(), 0, true); checkResponse(createOffsetDeleteResponse(), 0, true);
checkRequest(createAlterReplicaLogDirsRequest(), true); checkRequest(createAlterReplicaLogDirsRequest(), true);
checkErrorResponse(createAlterReplicaLogDirsRequest(), new UnknownServerException(), true); checkErrorResponse(createAlterReplicaLogDirsRequest(), unknownServerException, true);
checkResponse(createAlterReplicaLogDirsResponse(), 0, true); checkResponse(createAlterReplicaLogDirsResponse(), 0, true);
checkRequest(createDescribeClientQuotasRequest(), true);
checkErrorResponse(createDescribeClientQuotasRequest(), unknownServerException, true);
checkResponse(createDescribeClientQuotasResponse(), 0, true);
checkRequest(createAlterClientQuotasRequest(), true);
checkErrorResponse(createAlterClientQuotasRequest(), unknownServerException, true);
checkResponse(createAlterClientQuotasResponse(), 0, true);
} }
@Test @Test
@ -496,7 +509,7 @@ public class RequestResponseTest {
private void checkOlderFetchVersions() throws Exception { private void checkOlderFetchVersions() throws Exception {
int latestVersion = FETCH.latestVersion(); int latestVersion = FETCH.latestVersion();
for (int i = 0; i < latestVersion; ++i) { for (int i = 0; i < latestVersion; ++i) {
checkErrorResponse(createFetchRequest(i), new UnknownServerException(), true); checkErrorResponse(createFetchRequest(i), unknownServerException, true);
checkRequest(createFetchRequest(i), true); checkRequest(createFetchRequest(i), true);
checkResponse(createFetchResponse(), i, true); checkResponse(createFetchResponse(), i, true);
} }
@ -551,7 +564,13 @@ public class RequestResponseTest {
} }
private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) { private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) {
checkResponse(req.getErrorResponse(e), req.version(), checkEqualityAndHashCode); AbstractResponse response = req.getErrorResponse(e);
checkResponse(response, req.version(), checkEqualityAndHashCode);
if (e instanceof UnknownServerException) {
String responseStr = response.toStruct(req.version()).toString();
assertFalse(String.format("Unknown message included in response for %s: %s ", req.api, responseStr),
responseStr.contains(e.getMessage()));
}
} }
private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) { private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) {
@ -2325,4 +2344,25 @@ public class RequestResponseTest {
return new AlterReplicaLogDirsResponse(data); return new AlterReplicaLogDirsResponse(data);
} }
private DescribeClientQuotasRequest createDescribeClientQuotasRequest() {
ClientQuotaFilter filter = ClientQuotaFilter.all();
return new DescribeClientQuotasRequest.Builder(filter).build((short) 0);
}
private DescribeClientQuotasResponse createDescribeClientQuotasResponse() {
ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"));
return new DescribeClientQuotasResponse(Collections.singletonMap(entity, Collections.singletonMap("request_percentage", 1.0)), 0);
}
private AlterClientQuotasRequest createAlterClientQuotasRequest() {
ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"));
ClientQuotaAlteration.Op op = new ClientQuotaAlteration.Op("request_percentage", 2.0);
ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity, Collections.singleton(op));
return new AlterClientQuotasRequest.Builder(Collections.singleton(alteration), false).build((short) 0);
}
private AlterClientQuotasResponse createAlterClientQuotasResponse() {
ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"));
return new AlterClientQuotasResponse(Collections.singletonMap(entity, ApiError.NONE), 0);
}
} }

View File

@ -23,6 +23,7 @@ import java.util.{Collections, Properties}
import joptsimple._ import joptsimple._
import kafka.common.Config import kafka.common.Config
import kafka.log.LogConfig import kafka.log.LogConfig
import kafka.server.DynamicConfig.QuotaConfigs
import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig} import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncoder} import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncoder}
import kafka.utils.Implicits._ import kafka.utils.Implicits._
@ -364,6 +365,14 @@ object ConfigCommand extends Config {
adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
case ConfigType.User | ConfigType.Client => case ConfigType.User | ConfigType.Client =>
val nonQuotaConfigsToAdd = configsToBeAdded.keys.filterNot(QuotaConfigs.isQuotaConfig)
if (nonQuotaConfigsToAdd.nonEmpty)
throw new IllegalArgumentException(s"Only quota configs can be added for '$entityTypeHead' using --bootstrap-server. Unexpected config names: $nonQuotaConfigsToAdd")
val nonQuotaConfigsToDelete = configsToBeDeleted.filterNot(QuotaConfigs.isQuotaConfig)
if (nonQuotaConfigsToDelete.nonEmpty)
throw new IllegalArgumentException(s"Only quota configs can be deleted for '$entityTypeHead' using --bootstrap-server. Unexpected config names: $nonQuotaConfigsToDelete")
val oldConfig = getClientQuotasConfig(adminClient, entityTypes, entityNames) val oldConfig = getClientQuotasConfig(adminClient, entityTypes, entityNames)
val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains) val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)

View File

@ -23,6 +23,7 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.log.LogConfig import kafka.log.LogConfig
import kafka.utils.Log4jController import kafka.utils.Log4jController
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.server.DynamicConfig.QuotaConfigs
import kafka.utils._ import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient} import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.AlterConfigOp import org.apache.kafka.clients.admin.AlterConfigOp
@ -884,19 +885,20 @@ class AdminManager(val config: KafkaConfig,
!name.isDefined || !strict !name.isDefined || !strict
} }
def fromProps(props: Properties): Map[String, Double] = { def fromProps(props: Map[String, String]): Map[String, Double] = {
props.asScala.map { case (key, value) => props.map { case (key, value) =>
val doubleValue = try value.toDouble catch { val doubleValue = try value.toDouble catch {
case _: NumberFormatException => case _: NumberFormatException =>
throw new IllegalStateException(s"Unexpected client quota configuration value: ${key} -> ${value}") throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value")
} }
key -> doubleValue key -> doubleValue
} }
} }
(userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) => (userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) =>
if (!p.isEmpty && matches(userComponent, u) && matches(clientIdComponent, c)) val quotaProps = p.asScala.filter { case (key, _) => QuotaConfigs.isQuotaConfig(key) }
Some(userClientIdToEntity(u, c) -> fromProps(p)) if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c))
Some(userClientIdToEntity(u, c) -> fromProps(quotaProps))
else else
None None
}.flatten.toMap }.flatten.toMap

View File

@ -67,11 +67,20 @@ object DynamicConfig {
def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props, customPropsAllowed = true) def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props, customPropsAllowed = true)
} }
object Client { object QuotaConfigs {
//Properties
val ProducerByteRateOverrideProp = "producer_byte_rate" val ProducerByteRateOverrideProp = "producer_byte_rate"
val ConsumerByteRateOverrideProp = "consumer_byte_rate" val ConsumerByteRateOverrideProp = "consumer_byte_rate"
val RequestPercentageOverrideProp = "request_percentage" val RequestPercentageOverrideProp = "request_percentage"
private val configNames = Set(ProducerByteRateOverrideProp, ConsumerByteRateOverrideProp, RequestPercentageOverrideProp)
def isQuotaConfig(name: String): Boolean = configNames.contains(name)
}
object Client {
//Properties
val ProducerByteRateOverrideProp = QuotaConfigs.ProducerByteRateOverrideProp
val ConsumerByteRateOverrideProp = QuotaConfigs.ConsumerByteRateOverrideProp
val RequestPercentageOverrideProp = QuotaConfigs.RequestPercentageOverrideProp
//Defaults //Defaults
val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault

View File

@ -485,6 +485,27 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
testShouldAddClientConfig(None, Some(null)) testShouldAddClientConfig(None, Some(null))
} }
@Test
def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = {
val node = new Node(1, "localhost", 9092)
val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node)
def verifyCommand(entityType: String, alterOpts: String*): Unit = {
val opts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
"--entity-type", entityType, "--entity-name", "admin",
"--alter") ++ alterOpts)
val e = intercept[IllegalArgumentException] {
ConfigCommand.alterConfig(mockAdminClient, opts)
}
assertTrue(s"Unexpected exception: $e", e.getMessage.contains("some_config"))
}
verifyCommand("users", "--add-config", "consumer_byte_rate=20000,producer_byte_rate=10000,some_config=10")
verifyCommand("clients", "--add-config", "some_config=10")
verifyCommand("users", "--delete-config", "consumer_byte_rate=20000,some_config=10")
verifyCommand("clients", "--delete-config", "some_config=10")
}
@Test @Test
def shouldAddTopicConfigUsingZookeeper(): Unit = { def shouldAddTopicConfigUsingZookeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,

View File

@ -23,9 +23,10 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse} import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
import org.junit.Assert._ import org.junit.Assert._
import org.junit.Test import org.junit.Test
import java.util.concurrent.{ExecutionException, TimeUnit} import java.util.concurrent.{ExecutionException, TimeUnit}
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
class ClientQuotasRequestTest extends BaseRequestTest { class ClientQuotasRequestTest extends BaseRequestTest {
@ -37,6 +38,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
@Test @Test
def testAlterClientQuotasRequest(): Unit = { def testAlterClientQuotasRequest(): Unit = {
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user"), (ClientQuotaEntity.CLIENT_ID -> "client-id")).asJava) val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user"), (ClientQuotaEntity.CLIENT_ID -> "client-id")).asJava)
// Expect an empty configuration. // Expect an empty configuration.
@ -162,6 +164,32 @@ class ClientQuotasRequestTest extends BaseRequestTest {
)) ))
} }
@Test
def testClientQuotasForScramUsers(): Unit = {
val entityType = ConfigType.User
val userName = "user"
val mechanism = ScramMechanism.SCRAM_SHA_256
val credential = new ScramFormatter(mechanism).generateCredential("password", 4096)
val configs = adminZkClient.fetchEntityConfig(entityType, userName)
configs.setProperty(mechanism.mechanismName, ScramCredentialUtils.credentialToString(credential))
adminZkClient.changeConfigs(entityType, userName, configs)
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> userName).asJava)
verifyDescribeEntityQuotas(entity, Map.empty)
alterEntityQuotas(entity, Map(
(ProducerByteRateProp -> Some(10000.0)),
(ConsumerByteRateProp -> Some(20000.0))
), validateOnly = false)
verifyDescribeEntityQuotas(entity, Map(
(ProducerByteRateProp -> 10000.0),
(ConsumerByteRateProp -> 20000.0)
))
}
@Test(expected = classOf[InvalidRequestException]) @Test(expected = classOf[InvalidRequestException])
def testAlterClientQuotasBadUser(): Unit = { def testAlterClientQuotasBadUser(): Unit = {
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava) val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)