From bb10ae4273451468b26fad755bfa41001ac6849c Mon Sep 17 00:00:00 2001 From: Divij Vaidya Date: Fri, 12 May 2023 20:21:00 +0200 Subject: [PATCH] KAFKA-14962: Trim whitespace from ACL configuration (#13670) Reviewers: Manikumar Reddy , Christo Lolov --- .../kafka/common/config/AbstractConfig.java | 3 ++- .../common/config/AbstractConfigTest.java | 2 ++ .../security/authorizer/AclAuthorizer.scala | 18 +++++++++--------- .../security/authorizer/AuthorizerTest.scala | 18 ++++++++++++++++++ .../authorizer/StandardAuthorizer.java | 4 ++-- 5 files changed, 33 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index bbfd15c12a3..2280813db1e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -194,7 +194,8 @@ public class AbstractConfig { } public String getString(String key) { - return (String) get(key); + final String res = (String) get(key); + return res == null ? res : res.trim(); } public ConfigDef.Type typeOf(String key) { diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index c0c6f8cee37..5859dc1dc12 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -48,8 +48,10 @@ public class AbstractConfigTest { @Test public void testConfiguredInstances() { + testValidInputs(" "); testValidInputs(""); testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter"); + testValidInputs(" org.apache.kafka.common.metrics.FakeMetricsReporter "); testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter"); testInvalidInputs(","); testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter"); diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index ce3e2bd0de3..34070d16378 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -97,7 +97,7 @@ object AclAuthorizer { private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = { val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp). - map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean + map(_.toString.trim).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean if (!zkSslClientEnable) new ZKClientConfig else { @@ -109,9 +109,9 @@ object AclAuthorizer { configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue => zkClientConfig.setProperty(sysProp, if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp) - (prefixedValue.toString.toUpperCase == "HTTPS").toString + (prefixedValue.toString.trim.toUpperCase == "HTTPS").toString else - prefixedValue.toString) + prefixedValue.toString.trim) } } zkClientConfig @@ -185,22 +185,22 @@ class AclAuthorizer extends Authorizer with Logging { override def configure(javaConfigs: util.Map[String, _]): Unit = { val configs = javaConfigs.asScala val props = new java.util.Properties() - configs.forKeyValue { (key, value) => props.put(key, value.toString) } + configs.forKeyValue { (key, value) => props.put(key, value.toString.trim) } superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect { case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet }.getOrElse(Set.empty[KafkaPrincipal]) - shouldAllowEveryoneIfNoAclIsFound = configs.get(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) + shouldAllowEveryoneIfNoAclIsFound = configs.get(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.trim.toBoolean) // Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this // means that `KafkaConfig.zkConnect` must always be set by the user (even if `AclAuthorizer.ZkUrlProp` is also // set). val kafkaConfig = KafkaConfig.fromProps(props, doLog = false) - val zkUrl = configs.get(AclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect) - val zkConnectionTimeoutMs = configs.get(AclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs) - val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs) - val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests) + val zkUrl = configs.get(AclAuthorizer.ZkUrlProp).map(_.toString.trim).getOrElse(kafkaConfig.zkConnect) + val zkConnectionTimeoutMs = configs.get(AclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs) + val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs) + val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests) val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(kafkaConfig, configs) val time = Time.SYSTEM diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala index c39b785e38a..f855a58e3cb 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala @@ -326,6 +326,24 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest { } } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array(KRAFT, ZK)) + def testAclConfigWithWhitespace(quorum: String): Unit = { + val props = properties + props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, " true") + // replace all property values with leading & trailing whitespaces + props.replaceAll((_,v) => " " + v + " ") + val cfg = KafkaConfig.fromProps(props) + var testAuthorizer: Authorizer = null + try { + testAuthorizer = createAuthorizer(cfg.originals) + assertTrue(authorize(testAuthorizer, requestContext, READ, resource), + "when acls = null or [], authorizer should allow op with allow.everyone = true.") + } finally { + testAuthorizer.close() + } + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array(KRAFT, ZK)) def testAclManagementAPIs(quorum: String): Unit = { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java index 0725659ae4f..47f066c34b4 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java @@ -169,7 +169,7 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer { AuthorizationResult defaultResult = getDefaultResult(configs); int nodeId; try { - nodeId = Integer.parseInt(configs.get("node.id").toString()); + nodeId = Integer.parseInt(configs.get("node.id").toString().trim()); } catch (Exception e) { nodeId = -1; } @@ -204,6 +204,6 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer { static AuthorizationResult getDefaultResult(Map configs) { Object configValue = configs.get(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG); if (configValue == null) return DENIED; - return Boolean.valueOf(configValue.toString()) ? ALLOWED : DENIED; + return Boolean.parseBoolean(configValue.toString().trim()) ? ALLOWED : DENIED; } }