mirror of https://github.com/apache/kafka.git
KAFKA-3052; Broker properties get logged twice if acl enabled
Fix it by making it possible to pass the `doLog` parameter to `AbstractConfig`. As explained in the code comments, this means that we can continue to benefit from ZK default settings as specified in `KafkaConfig` without having to duplicate code. Also: * Removed unused private methods from `KafkaConfig` * Removed `case` modifier from `KafkaConfig` so that `hashCode`, `equals` and `toString` from `AbstractConfig` are used. * Made `props` a `val` and added `apply` method to `KafkaConfig` to remain binary compatible. * Call authorizer.close even if an exception is thrown during `configure`. Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Guozhang Wang Closes #725 from ijuma/kafka-3052-broker-properties-get-logged-twice-if-acl-enabled
This commit is contained in:
parent
2679524604
commit
f9642e2a98
|
@ -67,8 +67,10 @@ object AclCommand {
|
|||
|
||||
val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
|
||||
val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
|
||||
try {
|
||||
authZ.configure(authorizerProperties.asJava)
|
||||
try f(authZ)
|
||||
f(authZ)
|
||||
}
|
||||
finally CoreUtils.swallow(authZ.close())
|
||||
}
|
||||
|
||||
|
|
|
@ -80,18 +80,21 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
|
|||
override def configure(javaConfigs: util.Map[String, _]) {
|
||||
val configs = javaConfigs.asScala
|
||||
val props = new java.util.Properties()
|
||||
configs foreach { case (key, value) => props.put(key, value.toString) }
|
||||
val kafkaConfig = KafkaConfig.fromProps(props)
|
||||
configs.foreach { case (key, value) => props.put(key, value.toString) }
|
||||
|
||||
superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
|
||||
case str: String if str.nonEmpty => str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet
|
||||
}.getOrElse(Set.empty[KafkaPrincipal])
|
||||
|
||||
shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).map(_.toString.toBoolean).getOrElse(false)
|
||||
shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
|
||||
|
||||
val zkUrl = configs.getOrElse(SimpleAclAuthorizer.ZkUrlProp, kafkaConfig.zkConnect).toString
|
||||
val zkConnectionTimeoutMs = configs.getOrElse(SimpleAclAuthorizer.ZkConnectionTimeOutProp, kafkaConfig.zkConnectionTimeoutMs).toString.toInt
|
||||
val zkSessionTimeOutMs = configs.getOrElse(SimpleAclAuthorizer.ZkSessionTimeOutProp, kafkaConfig.zkSessionTimeoutMs).toString.toInt
|
||||
// Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this
|
||||
// means that `KafkaConfig.zkConnect` must always be set by the user (even if `SimpleAclAuthorizer.ZkUrlProp` is also
|
||||
// set).
|
||||
val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
|
||||
val zkUrl = configs.get(SimpleAclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
|
||||
val zkConnectionTimeoutMs = configs.get(SimpleAclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
|
||||
val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
|
||||
|
||||
zkUtils = ZkUtils(zkUrl,
|
||||
zkConnectionTimeoutMs,
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package kafka.server
|
||||
|
||||
import java.util
|
||||
import java.util.Properties
|
||||
|
||||
import kafka.api.ApiVersion
|
||||
|
@ -32,7 +31,6 @@ import org.apache.kafka.common.config.SaslConfigs
|
|||
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs}
|
||||
import org.apache.kafka.common.metrics.MetricsReporter
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.apache.kafka.common.security.auth.PrincipalBuilder
|
||||
|
||||
import scala.collection.{Map, immutable}
|
||||
|
||||
|
@ -687,19 +685,27 @@ object KafkaConfig {
|
|||
require(names.contains(name), "Unknown configuration \"%s\".".format(name))
|
||||
}
|
||||
|
||||
def fromProps(props: Properties): KafkaConfig = {
|
||||
KafkaConfig(props)
|
||||
}
|
||||
def fromProps(props: Properties): KafkaConfig =
|
||||
fromProps(props, true)
|
||||
|
||||
def fromProps(defaults: Properties, overrides: Properties): KafkaConfig = {
|
||||
def fromProps(props: Properties, doLog: Boolean): KafkaConfig =
|
||||
new KafkaConfig(props, doLog)
|
||||
|
||||
def fromProps(defaults: Properties, overrides: Properties): KafkaConfig =
|
||||
fromProps(defaults, overrides, true)
|
||||
|
||||
def fromProps(defaults: Properties, overrides: Properties, doLog: Boolean): KafkaConfig = {
|
||||
val props = new Properties()
|
||||
props.putAll(defaults)
|
||||
props.putAll(overrides)
|
||||
fromProps(props)
|
||||
fromProps(props, doLog)
|
||||
}
|
||||
|
||||
def apply(props: java.util.Map[_, _]): KafkaConfig = new KafkaConfig(props, true)
|
||||
|
||||
}
|
||||
|
||||
case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(KafkaConfig.configDef, props) {
|
||||
class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends AbstractConfig(KafkaConfig.configDef, props, doLog) {
|
||||
|
||||
/** ********* Zookeeper Configuration ***********/
|
||||
val zkConnect: String = getString(KafkaConfig.ZkConnectProp)
|
||||
|
@ -916,29 +922,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
|
|||
}
|
||||
}
|
||||
|
||||
private def getMetricClasses(metricClasses: java.util.List[String]): java.util.List[MetricsReporter] = {
|
||||
|
||||
val reporterList = new util.ArrayList[MetricsReporter]()
|
||||
val iterator = metricClasses.iterator()
|
||||
|
||||
while (iterator.hasNext) {
|
||||
val reporterName = iterator.next()
|
||||
if (!reporterName.isEmpty) {
|
||||
val reporter: MetricsReporter = CoreUtils.createObject[MetricsReporter](reporterName)
|
||||
reporter.configure(originals)
|
||||
reporterList.add(reporter)
|
||||
}
|
||||
}
|
||||
|
||||
reporterList
|
||||
|
||||
}
|
||||
|
||||
|
||||
private def getPrincipalBuilderClass(principalBuilderClass: String): PrincipalBuilder = {
|
||||
CoreUtils.createObject[PrincipalBuilder](principalBuilderClass)
|
||||
}
|
||||
|
||||
validateValues()
|
||||
|
||||
private def validateValues() {
|
||||
|
|
|
@ -131,7 +131,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
|
|||
}
|
||||
|
||||
def withAuthorizer(props: Properties)(f: Authorizer => Unit) {
|
||||
val kafkaConfig = KafkaConfig.fromProps(props)
|
||||
val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
|
||||
val authZ = new SimpleAclAuthorizer
|
||||
try {
|
||||
authZ.configure(kafkaConfig.originals)
|
||||
|
|
Loading…
Reference in New Issue