mirror of https://github.com/apache/kafka.git
KAFKA-14962: Trim whitespace from ACL configuration (#13670)
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Christo Lolov <lolovc@amazon.com>
This commit is contained in:
parent
f578b38f31
commit
bb10ae4273
|
@ -194,7 +194,8 @@ public class AbstractConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getString(String key) {
|
public String getString(String key) {
|
||||||
return (String) get(key);
|
final String res = (String) get(key);
|
||||||
|
return res == null ? res : res.trim();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConfigDef.Type typeOf(String key) {
|
public ConfigDef.Type typeOf(String key) {
|
||||||
|
|
|
@ -48,8 +48,10 @@ public class AbstractConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConfiguredInstances() {
|
public void testConfiguredInstances() {
|
||||||
|
testValidInputs(" ");
|
||||||
testValidInputs("");
|
testValidInputs("");
|
||||||
testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter");
|
testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter");
|
||||||
|
testValidInputs(" org.apache.kafka.common.metrics.FakeMetricsReporter ");
|
||||||
testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter");
|
testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter");
|
||||||
testInvalidInputs(",");
|
testInvalidInputs(",");
|
||||||
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
|
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
|
||||||
|
|
|
@ -97,7 +97,7 @@ object AclAuthorizer {
|
||||||
|
|
||||||
private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
|
private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
|
||||||
val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
|
val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
|
||||||
map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
|
map(_.toString.trim).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
|
||||||
if (!zkSslClientEnable)
|
if (!zkSslClientEnable)
|
||||||
new ZKClientConfig
|
new ZKClientConfig
|
||||||
else {
|
else {
|
||||||
|
@ -109,9 +109,9 @@ object AclAuthorizer {
|
||||||
configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
|
configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
|
||||||
zkClientConfig.setProperty(sysProp,
|
zkClientConfig.setProperty(sysProp,
|
||||||
if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
|
if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
|
||||||
(prefixedValue.toString.toUpperCase == "HTTPS").toString
|
(prefixedValue.toString.trim.toUpperCase == "HTTPS").toString
|
||||||
else
|
else
|
||||||
prefixedValue.toString)
|
prefixedValue.toString.trim)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
zkClientConfig
|
zkClientConfig
|
||||||
|
@ -185,22 +185,22 @@ class AclAuthorizer extends Authorizer with Logging {
|
||||||
override def configure(javaConfigs: util.Map[String, _]): Unit = {
|
override def configure(javaConfigs: util.Map[String, _]): Unit = {
|
||||||
val configs = javaConfigs.asScala
|
val configs = javaConfigs.asScala
|
||||||
val props = new java.util.Properties()
|
val props = new java.util.Properties()
|
||||||
configs.forKeyValue { (key, value) => props.put(key, value.toString) }
|
configs.forKeyValue { (key, value) => props.put(key, value.toString.trim) }
|
||||||
|
|
||||||
superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
|
superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
|
||||||
case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
|
case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
|
||||||
}.getOrElse(Set.empty[KafkaPrincipal])
|
}.getOrElse(Set.empty[KafkaPrincipal])
|
||||||
|
|
||||||
shouldAllowEveryoneIfNoAclIsFound = configs.get(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
|
shouldAllowEveryoneIfNoAclIsFound = configs.get(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.trim.toBoolean)
|
||||||
|
|
||||||
// Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this
|
// Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this
|
||||||
// means that `KafkaConfig.zkConnect` must always be set by the user (even if `AclAuthorizer.ZkUrlProp` is also
|
// means that `KafkaConfig.zkConnect` must always be set by the user (even if `AclAuthorizer.ZkUrlProp` is also
|
||||||
// set).
|
// set).
|
||||||
val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
|
val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
|
||||||
val zkUrl = configs.get(AclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
|
val zkUrl = configs.get(AclAuthorizer.ZkUrlProp).map(_.toString.trim).getOrElse(kafkaConfig.zkConnect)
|
||||||
val zkConnectionTimeoutMs = configs.get(AclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
|
val zkConnectionTimeoutMs = configs.get(AclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
|
||||||
val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
|
val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
|
||||||
val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
|
val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
|
||||||
|
|
||||||
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(kafkaConfig, configs)
|
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(kafkaConfig, configs)
|
||||||
val time = Time.SYSTEM
|
val time = Time.SYSTEM
|
||||||
|
|
|
@ -326,6 +326,24 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
|
@ValueSource(strings = Array(KRAFT, ZK))
|
||||||
|
def testAclConfigWithWhitespace(quorum: String): Unit = {
|
||||||
|
val props = properties
|
||||||
|
props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, " true")
|
||||||
|
// replace all property values with leading & trailing whitespaces
|
||||||
|
props.replaceAll((_,v) => " " + v + " ")
|
||||||
|
val cfg = KafkaConfig.fromProps(props)
|
||||||
|
var testAuthorizer: Authorizer = null
|
||||||
|
try {
|
||||||
|
testAuthorizer = createAuthorizer(cfg.originals)
|
||||||
|
assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
|
||||||
|
"when acls = null or [], authorizer should allow op with allow.everyone = true.")
|
||||||
|
} finally {
|
||||||
|
testAuthorizer.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
@ValueSource(strings = Array(KRAFT, ZK))
|
@ValueSource(strings = Array(KRAFT, ZK))
|
||||||
def testAclManagementAPIs(quorum: String): Unit = {
|
def testAclManagementAPIs(quorum: String): Unit = {
|
||||||
|
|
|
@ -169,7 +169,7 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
|
||||||
AuthorizationResult defaultResult = getDefaultResult(configs);
|
AuthorizationResult defaultResult = getDefaultResult(configs);
|
||||||
int nodeId;
|
int nodeId;
|
||||||
try {
|
try {
|
||||||
nodeId = Integer.parseInt(configs.get("node.id").toString());
|
nodeId = Integer.parseInt(configs.get("node.id").toString().trim());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
nodeId = -1;
|
nodeId = -1;
|
||||||
}
|
}
|
||||||
|
@ -204,6 +204,6 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
|
||||||
static AuthorizationResult getDefaultResult(Map<String, ?> configs) {
|
static AuthorizationResult getDefaultResult(Map<String, ?> configs) {
|
||||||
Object configValue = configs.get(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG);
|
Object configValue = configs.get(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG);
|
||||||
if (configValue == null) return DENIED;
|
if (configValue == null) return DENIED;
|
||||||
return Boolean.valueOf(configValue.toString()) ? ALLOWED : DENIED;
|
return Boolean.parseBoolean(configValue.toString().trim()) ? ALLOWED : DENIED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue