KAFKA-14943: Fix ClientQuotaControlManager validation

Don't allow setting negative or zero values for quotas. Don't allow SCRAM mechanism names to be
used as client quota names. SCRAM mechanisms are not client quotas. (The confusion arose because of
internal ZK representation details that treated them both as "client configs.")

Add unit tests for ClientQuotaControlManager.isValidIpEntity and
ClientQuotaControlManager.configKeysForEntityType.

This change doesn't affect metadata record application, only input validation. If there are bad
client quotas that are set currently, this change will not alter the current behavior (of throwing
an exception and ignoring the bad quota).
This commit is contained in:
Colin P. McCabe 2023-04-26 16:10:46 -07:00
parent 8bde4e79cd
commit 7049333617
4 changed files with 242 additions and 45 deletions

View File

@ -46,9 +46,11 @@ public class QuotaConfigs {
public static final int IP_CONNECTION_RATE_DEFAULT = Integer.MAX_VALUE;
private static Set<String> userClientConfigNames = new HashSet<>(Arrays.asList(
PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, CONSUMER_BYTE_RATE_OVERRIDE_CONFIG,
REQUEST_PERCENTAGE_OVERRIDE_CONFIG, CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG
private final static Set<String> USER_AND_CLIENT_QUOTA_NAMES = new HashSet<>(Arrays.asList(
PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
CONSUMER_BYTE_RATE_OVERRIDE_CONFIG,
REQUEST_PERCENTAGE_OVERRIDE_CONFIG,
CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG
));
private static void buildUserClientQuotaConfigDef(ConfigDef configDef) {
@ -68,10 +70,16 @@ public class QuotaConfigs {
}
public static boolean isClientOrUserConfig(String name) {
return userClientConfigNames.contains(name);
return USER_AND_CLIENT_QUOTA_NAMES.contains(name);
}
public static ConfigDef userConfigs() {
public static ConfigDef userAndClientQuotaConfigs() {
ConfigDef configDef = new ConfigDef();
buildUserClientQuotaConfigDef(configDef);
return configDef;
}
public static ConfigDef scramMechanismsPlusUserAndClientQuotaConfigs() {
ConfigDef configDef = new ConfigDef();
ScramMechanism.mechanismNames().forEach(mechanismName -> {
configDef.define(mechanismName, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
@ -81,12 +89,6 @@ public class QuotaConfigs {
return configDef;
}
public static ConfigDef clientConfigs() {
ConfigDef configDef = new ConfigDef();
buildUserClientQuotaConfigDef(configDef);
return configDef;
}
public static ConfigDef ipConfigs() {
ConfigDef configDef = new ConfigDef();
configDef.define(IP_CONNECTION_RATE_OVERRIDE_CONFIG, ConfigDef.Type.INT, Integer.MAX_VALUE,

View File

@ -71,7 +71,7 @@ object DynamicConfig {
}
object Client {
private val clientConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.clientConfigs()
private val clientConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.userAndClientQuotaConfigs()
def configKeys = clientConfigs.configKeys
@ -81,7 +81,7 @@ object DynamicConfig {
}
object User {
private val userConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.userConfigs()
private val userConfigs = org.apache.kafka.common.config.internals.QuotaConfigs.scramMechanismsPlusUserAndClientQuotaConfigs()
def configKeys = userConfigs.configKeys

View File

@ -115,11 +115,11 @@ public class ClientQuotaControlManager {
}
private void alterClientQuotaEntity(
ClientQuotaEntity entity,
Map<String, Double> newQuotaConfigs,
List<ApiMessageAndVersion> outputRecords,
Map<ClientQuotaEntity, ApiError> outputResults) {
ClientQuotaEntity entity,
Map<String, Double> newQuotaConfigs,
List<ApiMessageAndVersion> outputRecords,
Map<ClientQuotaEntity, ApiError> outputResults
) {
// Check entity types and sanitize the names
Map<String, String> validatedEntityMap = new HashMap<>(3);
ApiError error = validateEntity(entity, validatedEntityMap);
@ -181,7 +181,7 @@ public class ClientQuotaControlManager {
outputResults.put(entity, ApiError.NONE);
}
private ApiError configKeysForEntityType(Map<String, String> entity, Map<String, ConfigDef.ConfigKey> output) {
static ApiError configKeysForEntityType(Map<String, String> entity, Map<String, ConfigDef.ConfigKey> output) {
// We only allow certain combinations of quota entity types. Which type is in use determines which config
// keys are valid
boolean hasUser = entity.containsKey(ClientQuotaEntity.USER);
@ -200,12 +200,8 @@ public class ClientQuotaControlManager {
return new ApiError(Errors.INVALID_REQUEST, entity.get(ClientQuotaEntity.IP) + " is not a valid IP or resolvable host.");
}
}
} else if (hasUser && hasClientId) {
configKeys = QuotaConfigs.userConfigs().configKeys();
} else if (hasUser) {
configKeys = QuotaConfigs.userConfigs().configKeys();
} else if (hasClientId) {
configKeys = QuotaConfigs.clientConfigs().configKeys();
} else if (hasUser || hasClientId) {
configKeys = QuotaConfigs.userAndClientQuotaConfigs().configKeys();
} else {
return new ApiError(Errors.INVALID_REQUEST, "Invalid empty client quota entity");
}
@ -214,46 +210,64 @@ public class ClientQuotaControlManager {
return ApiError.NONE;
}
private ApiError validateQuotaKeyValue(Map<String, ConfigDef.ConfigKey> validKeys, String key, Double value) {
// TODO can this validation be shared with alter configs?
static ApiError validateQuotaKeyValue(
Map<String, ConfigDef.ConfigKey> validKeys,
String key,
double value
) {
// Ensure we have an allowed quota key
ConfigDef.ConfigKey configKey = validKeys.get(key);
if (configKey == null) {
return new ApiError(Errors.INVALID_REQUEST, "Invalid configuration key " + key);
}
if (value <= 0.0) {
return new ApiError(Errors.INVALID_REQUEST, "Quota " + key + " must be greater than 0");
}
// Ensure the quota value is valid
switch (configKey.type()) {
case DOUBLE:
break;
return ApiError.NONE;
case SHORT:
case INT:
case LONG:
Double epsilon = 1e-6;
Long longValue = Double.valueOf(value + epsilon).longValue();
if (Math.abs(longValue.doubleValue() - value) > epsilon) {
if (value > Short.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
"Configuration " + key + " must be a Long value");
"Proposed value for " + key + " is too large for a SHORT.");
}
break;
return getErrorForIntegralQuotaValue(value, key);
case INT:
if (value > Integer.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for an INT.");
}
return getErrorForIntegralQuotaValue(value, key);
case LONG: {
if (value > Long.MAX_VALUE) {
return new ApiError(Errors.INVALID_REQUEST,
"Proposed value for " + key + " is too large for a LONG.");
}
return getErrorForIntegralQuotaValue(value, key);
}
default:
return new ApiError(Errors.UNKNOWN_SERVER_ERROR,
"Unexpected config type " + configKey.type() + " should be Long or Double");
}
}
static ApiError getErrorForIntegralQuotaValue(double value, String key) {
double remainder = Math.abs(value % 1.0);
if (remainder > 1e-6) {
return new ApiError(Errors.INVALID_REQUEST, key + " cannot be a fractional value.");
}
return ApiError.NONE;
}
// TODO move this somewhere common?
private boolean isValidIpEntity(String ip) {
if (Objects.nonNull(ip)) {
try {
InetAddress.getByName(ip);
return true;
} catch (UnknownHostException e) {
return false;
}
} else {
static boolean isValidIpEntity(String ip) {
if (ip == null) return true;
try {
InetAddress.getByName(ip);
return true;
} catch (UnknownHostException e) {
return false;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.controller;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.internals.QuotaConfigs;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
@ -36,6 +37,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
@ -306,4 +308,183 @@ public class ClientQuotaControlManagerTest {
entries.put(ClientQuotaEntity.CLIENT_ID, clientId);
return new ClientQuotaEntity(entries);
}
@Test
public void testIsValidIpEntityWithNull() {
assertTrue(ClientQuotaControlManager.isValidIpEntity(null));
}
@Test
public void testIsValidIpEntityWithUnresolvableHostname() {
// example.invalid will never be valid, as per RFC 2606.
assertFalse(ClientQuotaControlManager.isValidIpEntity("example.invalid"));
}
@Test
public void testIsValidIpEntityWithLocalhost() {
assertTrue(ClientQuotaControlManager.isValidIpEntity("127.0.0.1"));
}
@Test
public void testConfigKeysForEntityTypeWithUser() {
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.USER),
Arrays.asList(
"producer_byte_rate",
"consumer_byte_rate",
"controller_mutation_rate",
"request_percentage"
));
}
@Test
public void testConfigKeysForEntityTypeWithClientId() {
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.CLIENT_ID),
Arrays.asList(
"producer_byte_rate",
"consumer_byte_rate",
"controller_mutation_rate",
"request_percentage"
));
}
@Test
public void testConfigKeysForEntityTypeWithUserAndClientId() {
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.CLIENT_ID, ClientQuotaEntity.USER),
Arrays.asList(
"producer_byte_rate",
"consumer_byte_rate",
"controller_mutation_rate",
"request_percentage"
));
}
@Test
public void testConfigKeysForEntityTypeWithIp() {
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.IP),
Arrays.asList(
"connection_creation_rate"
));
}
private static Map<String, String> keysToEntity(List<String> entityKeys) {
HashMap<String, String> entity = new HashMap<>();
for (String entityKey : entityKeys) {
if (entityKey.equals(ClientQuotaEntity.IP)) {
entity.put(entityKey, "127.0.0.1");
} else {
entity.put(entityKey, "foo");
}
}
return entity;
}
private static void testConfigKeysForEntityType(
List<String> entityKeys,
List<String> expectedConfigs
) {
HashMap<String, ConfigDef.ConfigKey> output = new HashMap<>();
assertEquals(ApiError.NONE, ClientQuotaControlManager.configKeysForEntityType(
keysToEntity(entityKeys), output));
assertEquals(new HashSet<>(expectedConfigs), output.keySet());
}
@Test
public void testConfigKeysForEmptyEntity() {
testConfigKeysError(Arrays.asList(),
new ApiError(Errors.INVALID_REQUEST, "Invalid empty client quota entity"));
}
@Test
public void testConfigKeysForEntityTypeWithIpAndUser() {
testConfigKeysError(Arrays.asList(ClientQuotaEntity.IP, ClientQuotaEntity.USER),
new ApiError(Errors.INVALID_REQUEST, "Invalid quota entity combination, IP entity should" +
"not be combined with User or ClientId"));
}
@Test
public void testConfigKeysForEntityTypeWithIpAndClientId() {
testConfigKeysError(Arrays.asList(ClientQuotaEntity.IP, ClientQuotaEntity.CLIENT_ID),
new ApiError(Errors.INVALID_REQUEST, "Invalid quota entity combination, IP entity should" +
"not be combined with User or ClientId"));
}
private static void testConfigKeysError(List<String> entityKeys, ApiError expectedError) {
testConfigKeysError(keysToEntity(entityKeys), expectedError);
}
@Test
public void testConfigKeysForUnresolvableIpEntity() {
testConfigKeysError(Collections.singletonMap(ClientQuotaEntity.IP, "example.invalid"),
new ApiError(Errors.INVALID_REQUEST, "example.invalid is not a valid IP or resolvable host."));
}
private static void testConfigKeysError(
Map<String, String> entity,
ApiError expectedError
) {
HashMap<String, ConfigDef.ConfigKey> output = new HashMap<>();
assertEquals(expectedError, ClientQuotaControlManager.configKeysForEntityType(entity, output));
}
private final static HashMap<String, ConfigDef.ConfigKey> VALID_CLIENT_ID_QUOTA_KEYS;
static {
VALID_CLIENT_ID_QUOTA_KEYS = new HashMap<>();
assertEquals(ApiError.NONE, ClientQuotaControlManager.configKeysForEntityType(
keysToEntity(Arrays.asList(ClientQuotaEntity.CLIENT_ID)), VALID_CLIENT_ID_QUOTA_KEYS));
}
@Test
public void testValidateQuotaKeyValueForUnknownQuota() {
assertEquals(new ApiError(Errors.INVALID_REQUEST, "Invalid configuration key foobar"),
ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "foobar", 1.0));
}
@Test
public void testValidateQuotaKeyValueForZeroQuota() {
assertEquals(new ApiError(Errors.INVALID_REQUEST, "Quota producer_byte_rate must be greater than 0"),
ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "producer_byte_rate", 0.0));
}
@Test
public void testValidateQuotaKeyValueForNegativeQuota() {
assertEquals(new ApiError(Errors.INVALID_REQUEST, "Quota consumer_byte_rate must be greater than 0"),
ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", -2.0));
}
@Test
public void testValidateQuotaKeyValueForValidConsumerByteRate() {
assertEquals(ApiError.NONE, ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 1234.0));
}
@Test
public void testValidateQuotaKeyValueForConsumerByteRateTooLarge() {
assertEquals(new ApiError(Errors.INVALID_REQUEST,
"Proposed value for consumer_byte_rate is too large for a LONG."),
ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 36893488147419103232.4));
}
@Test
public void testValidateQuotaKeyValueForFractionalConsumerByteRate() {
assertEquals(new ApiError(Errors.INVALID_REQUEST, "consumer_byte_rate cannot be a fractional value."),
ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 2.245));
}
@Test
public void testValidateQuotaKeyValueForValidConsumerByteRate2() {
assertEquals(ApiError.NONE, ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "consumer_byte_rate", 1235.0000001));
}
@Test
public void testValidateQuotaKeyValueForValidRequestPercentage() {
assertEquals(ApiError.NONE, ClientQuotaControlManager.validateQuotaKeyValue(
VALID_CLIENT_ID_QUOTA_KEYS, "request_percentage", 56.62367));
}
}