KAFKA-14588 ZK configuration moved to ZkConfig (#15075)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Nikolay 2024-03-27 17:37:01 +03:00 committed by GitHub
parent ace2152d22
commit 6f38fe5e0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 439 additions and 362 deletions

View File

@ -2160,6 +2160,7 @@ project(':streams') {
testImplementation project(':storage')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':server')
testImplementation libs.log4j
testImplementation libs.junitJupiter
testImplementation libs.junitVintageEngine
@ -2742,6 +2743,7 @@ project(':jmh-benchmarks') {
exclude group: 'net.sf.jopt-simple', module: 'jopt-simple'
}
implementation project(':server-common')
implementation project(':server')
implementation project(':clients')
implementation project(':group-coordinator')
implementation project(':metadata')
@ -2899,7 +2901,7 @@ project(':connect:json') {
api libs.jacksonAfterburner
implementation libs.slf4jApi
testImplementation libs.junitJupiter
testRuntimeOnly libs.slf4jlog4j
@ -2969,6 +2971,7 @@ project(':connect:runtime') {
testImplementation project(':metadata')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
testImplementation project(':server')
testImplementation project(':storage')
testImplementation project(':connect:test-plugins')

View File

@ -605,6 +605,7 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.eclipse.jetty.client"/>
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
<allow class="org.apache.kafka.server.config.ZkConfigs" />
</subpackage>
</subpackage>

View File

@ -55,6 +55,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -154,7 +155,7 @@ public class EmbeddedKafkaCluster {
}
private void doStart() {
brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
brokerConfig.put(ZkConfigs.ZK_CONNECT_PROP, zKConnectString());
putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);

View File

@ -21,7 +21,6 @@ import java.util.Properties
import joptsimple._
import joptsimple.util.EnumConverter
import kafka.security.authorizer.{AclAuthorizer, AclEntry}
import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.acl._
@ -33,6 +32,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
@ -200,7 +200,7 @@ object AclCommand extends Logging {
// We will default the value of zookeeper.set.acl to true or false based on whether SASL is configured,
// but if SASL is not configured and zookeeper.set.acl is supposed to be true due to mutual certificate authentication
// then it will be up to the user to explicitly specify zookeeper.set.acl=true in the authorizer-properties.
val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSaslEnabled)
val defaultProps = Map(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP -> JaasUtils.isZkSaslEnabled)
val authorizerPropertiesWithoutTls =
if (opts.options.has(opts.authorizerPropertiesOpt)) {
val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt)
@ -211,7 +211,7 @@ object AclCommand extends Logging {
val authorizerProperties =
if (opts.options.has(opts.zkTlsConfigFile)) {
// load in TLS configs both with and without the "authorizer." prefix
val validKeys = (KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList ++ KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.map("authorizer." + _).toList).asJava
val validKeys = (ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList ++ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.map("authorizer." + _).toList).asJava
authorizerPropertiesWithoutTls ++ Utils.loadProps(opts.options.valueOf(opts.zkTlsConfigFile), validKeys).asInstanceOf[java.util.Map[String, Any]].asScala
}
else
@ -619,7 +619,7 @@ object AclCommand extends Logging {
"DEPRECATED: Identifies the file where ZooKeeper client TLS connectivity properties are defined for" +
" the default authorizer kafka.security.authorizer.AclAuthorizer." +
" Any properties other than the following (with or without an \"authorizer.\" prefix) are ignored: " +
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.sorted.mkString(", ") +
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.sorted.mkString(", ") +
". Note that if SASL is not configured and zookeeper.set.acl is supposed to be true due to mutual certificate authentication being used" +
" then it is necessary to explicitly specify --authorizer-properties zookeeper.set.acl=true. " +
AclCommand.AuthorizerDeprecationMessage)

View File

@ -35,7 +35,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals, ZkConfigs}
import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
@ -864,7 +864,7 @@ object ConfigCommand extends Logging {
.ofType(classOf[String])
val zkTlsConfigFile: OptionSpec[String] = parser.accepts("zk-tls-config-file",
"Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " +
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.sorted.mkString(", ") + " are ignored.")
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.sorted.mkString(", ") + " are ignored.")
.withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String])
options = parser.parse(args : _*)

View File

@ -24,6 +24,7 @@ import kafka.utils.Implicits._
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
import org.apache.zookeeper.KeeperException
@ -81,7 +82,7 @@ object ZkSecurityMigrator extends Logging {
if (jaasFile == null && !tlsClientAuthEnabled) {
val errorMsg = s"No JAAS configuration file has been specified and no TLS client certificate has been specified. Please make sure that you set " +
s"the system property ${JaasUtils.JAVA_LOGIN_CONFIG_PARAM} or provide a ZooKeeper client TLS configuration via --$tlsConfigFileOption <filename> " +
s"identifying at least ${KafkaConfig.ZkSslClientEnableProp}, ${KafkaConfig.ZkClientCnxnSocketProp}, and ${KafkaConfig.ZkSslKeyStoreLocationProp}"
s"identifying at least ${ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP}, ${ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP}, and ${ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP}"
System.err.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
}
@ -124,7 +125,7 @@ object ZkSecurityMigrator extends Logging {
}
def createZkClientConfigFromFile(filename: String) : ZKClientConfig = {
val zkTlsConfigFileProps = Utils.loadProps(filename, KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.asJava)
val zkTlsConfigFileProps = Utils.loadProps(filename, ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.asJava)
val zkClientConfig = new ZKClientConfig() // Initializes based on any system properties that have been set
// Now override any set system properties with explicitly-provided values from the config file
// Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make
@ -156,7 +157,7 @@ object ZkSecurityMigrator extends Logging {
"before migration. If not, exit the command.")
val zkTlsConfigFile: OptionSpec[String] = parser.accepts(tlsConfigFileOption,
"Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " +
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.mkString(", ") + " are ignored.")
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.mkString(", ") + " are ignored.")
.withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String])
options = parser.parse(args : _*)
}

View File

@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV1
import org.apache.kafka.server.config.ZkConfigs
import org.apache.zookeeper.client.ZKClientConfig
import scala.annotation.nowarn
@ -96,7 +97,7 @@ object AclAuthorizer {
}
private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP).
map(_.toString.trim).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
if (!zkSslClientEnable)
new ZKClientConfig
@ -105,10 +106,10 @@ object AclAuthorizer {
// be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, forceZkSslClientEnable = true)
// add in any prefixed overlays
KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, sysProp) =>
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.forKeyValue { (kafkaProp, sysProp) =>
configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
zkClientConfig.setProperty(sysProp,
if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
if (kafkaProp == ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP)
(prefixedValue.toString.trim.toUpperCase == "HTTPS").toString
else
prefixedValue.toString.trim)

View File

@ -45,7 +45,7 @@ import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{MetadataVersion, MetadataVersionValidator}
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms}
import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Csv
@ -67,53 +67,15 @@ object KafkaConfig {
DynamicBrokerConfig.dynamicConfigUpdateModes))
}
/** ********* Zookeeper Configuration ***********/
val ZkConnectProp = "zookeeper.connect"
val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
val ZkEnableSecureAclsProp = "zookeeper.set.acl"
val ZkMaxInFlightRequestsProp = "zookeeper.max.in.flight.requests"
val ZkSslClientEnableProp = "zookeeper.ssl.client.enable"
val ZkClientCnxnSocketProp = "zookeeper.clientCnxnSocket"
val ZkSslKeyStoreLocationProp = "zookeeper.ssl.keystore.location"
val ZkSslKeyStorePasswordProp = "zookeeper.ssl.keystore.password"
val ZkSslKeyStoreTypeProp = "zookeeper.ssl.keystore.type"
val ZkSslTrustStoreLocationProp = "zookeeper.ssl.truststore.location"
val ZkSslTrustStorePasswordProp = "zookeeper.ssl.truststore.password"
val ZkSslTrustStoreTypeProp = "zookeeper.ssl.truststore.type"
val ZkSslProtocolProp = "zookeeper.ssl.protocol"
val ZkSslEnabledProtocolsProp = "zookeeper.ssl.enabled.protocols"
val ZkSslCipherSuitesProp = "zookeeper.ssl.cipher.suites"
val ZkSslEndpointIdentificationAlgorithmProp = "zookeeper.ssl.endpoint.identification.algorithm"
val ZkSslCrlEnableProp = "zookeeper.ssl.crl.enable"
val ZkSslOcspEnableProp = "zookeeper.ssl.ocsp.enable"
// a map from the Kafka config to the corresponding ZooKeeper Java system property
private[kafka] val ZkSslConfigToSystemPropertyMap: Map[String, String] = Map(
ZkSslClientEnableProp -> ZKClientConfig.SECURE_CLIENT,
ZkClientCnxnSocketProp -> ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
ZkSslKeyStoreLocationProp -> "zookeeper.ssl.keyStore.location",
ZkSslKeyStorePasswordProp -> "zookeeper.ssl.keyStore.password",
ZkSslKeyStoreTypeProp -> "zookeeper.ssl.keyStore.type",
ZkSslTrustStoreLocationProp -> "zookeeper.ssl.trustStore.location",
ZkSslTrustStorePasswordProp -> "zookeeper.ssl.trustStore.password",
ZkSslTrustStoreTypeProp -> "zookeeper.ssl.trustStore.type",
ZkSslProtocolProp -> "zookeeper.ssl.protocol",
ZkSslEnabledProtocolsProp -> "zookeeper.ssl.enabledProtocols",
ZkSslCipherSuitesProp -> "zookeeper.ssl.ciphersuites",
ZkSslEndpointIdentificationAlgorithmProp -> "zookeeper.ssl.hostnameVerification",
ZkSslCrlEnableProp -> "zookeeper.ssl.crl",
ZkSslOcspEnableProp -> "zookeeper.ssl.ocsp")
private[kafka] def zooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
Option(clientConfig.getProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName)))
Option(clientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName)))
}
private[kafka] def setZooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String, kafkaPropValue: Any): Unit = {
clientConfig.setProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName),
clientConfig.setProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName),
kafkaPropName match {
case ZkSslEndpointIdentificationAlgorithmProp => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString
case ZkSslEnabledProtocolsProp | ZkSslCipherSuitesProp => kafkaPropValue match {
case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString
case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => kafkaPropValue match {
case list: java.util.List[_] => list.asScala.mkString(",")
case _ => kafkaPropValue.toString
}
@ -124,9 +86,9 @@ object KafkaConfig {
// For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS
// with both a client connection socket and a key store location explicitly set.
private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = {
zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).contains("true") &&
zooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
zooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP).contains("true") &&
zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP).isDefined &&
zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP).isDefined
}
/** ********* General Configuration ***********/
@ -439,51 +401,6 @@ object KafkaConfig {
val UnstableMetadataVersionsEnableProp = "unstable.metadata.versions.enable"
/* Documentation */
/** ********* Zookeeper Configuration ***********/
val ZkConnectDoc = "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the " +
"host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is " +
"down you can also specify multiple hosts in the form <code>hostname1:port1,hostname2:port2,hostname3:port3</code>.\n" +
"The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " +
"For example to give a chroot path of <code>/chroot/path</code> you would give the connection string as <code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code>."
val ZkSessionTimeoutMsDoc = "Zookeeper session timeout"
val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZkSessionTimeoutMsProp + " is used"
val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
val ZkMaxInFlightRequestsDoc = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking."
val ZkSslClientEnableDoc = "Set client to use TLS when connecting to ZooKeeper." +
" An explicit value overrides any value set via the <code>zookeeper.client.secure</code> system property (note the different name)." +
s" Defaults to false if neither is set; when true, <code>$ZkClientCnxnSocketProp</code> must be set (typically to <code>org.apache.zookeeper.ClientCnxnSocketNetty</code>); other values to set may include " +
ZkSslConfigToSystemPropertyMap.keys.toList.filter(x => x != ZkSslClientEnableProp && x != ZkClientCnxnSocketProp).sorted.mkString("<code>", "</code>, <code>", "</code>")
val ZkClientCnxnSocketDoc = "Typically set to <code>org.apache.zookeeper.ClientCnxnSocketNetty</code> when using TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the same-named <code>${ZkSslConfigToSystemPropertyMap(ZkClientCnxnSocketProp)}</code> system property."
val ZkSslKeyStoreLocationDoc = "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStoreLocationProp)}</code> system property (note the camelCase)."
val ZkSslKeyStorePasswordDoc = "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStorePasswordProp)}</code> system property (note the camelCase)." +
" Note that ZooKeeper does not support a key password different from the keystore password, so be sure to set the key password in the keystore to be identical to the keystore password; otherwise the connection attempt to Zookeeper will fail."
val ZkSslKeyStoreTypeDoc = "Keystore type when using a client-side certificate with TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStoreTypeProp)}</code> system property (note the camelCase)." +
" The default value of <code>null</code> means the type will be auto-detected based on the filename extension of the keystore."
val ZkSslTrustStoreLocationDoc = "Truststore location when using TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStoreLocationProp)}</code> system property (note the camelCase)."
val ZkSslTrustStorePasswordDoc = "Truststore password when using TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStorePasswordProp)}</code> system property (note the camelCase)."
val ZkSslTrustStoreTypeDoc = "Truststore type when using TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStoreTypeProp)}</code> system property (note the camelCase)." +
" The default value of <code>null</code> means the type will be auto-detected based on the filename extension of the truststore."
val ZkSslProtocolDoc = "Specifies the protocol to be used in ZooKeeper TLS negotiation." +
s" An explicit value overrides any value set via the same-named <code>${ZkSslConfigToSystemPropertyMap(ZkSslProtocolProp)}</code> system property."
val ZkSslEnabledProtocolsDoc = "Specifies the enabled protocol(s) in ZooKeeper TLS negotiation (csv)." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslEnabledProtocolsProp)}</code> system property (note the camelCase)." +
s" The default value of <code>null</code> means the enabled protocol will be the value of the <code>${KafkaConfig.ZkSslProtocolProp}</code> configuration property."
val ZkSslCipherSuitesDoc = "Specifies the enabled cipher suites to be used in ZooKeeper TLS negotiation (csv)." +
s""" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslCipherSuitesProp)}</code> system property (note the single word \"ciphersuites\").""" +
" The default value of <code>null</code> means the list of enabled cipher suites is determined by the Java runtime being used."
val ZkSslEndpointIdentificationAlgorithmDoc = "Specifies whether to enable hostname verification in the ZooKeeper TLS negotiation process, with (case-insensitively) \"https\" meaning ZooKeeper hostname verification is enabled and an explicit blank value meaning it is disabled (disabling it is only recommended for testing purposes)." +
s""" An explicit value overrides any \"true\" or \"false\" value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslEndpointIdentificationAlgorithmProp)}</code> system property (note the different name and values; true implies https and false implies blank)."""
val ZkSslCrlEnableDoc = "Specifies whether to enable Certificate Revocation List in the ZooKeeper TLS protocols." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslCrlEnableProp)}</code> system property (note the shorter name)."
val ZkSslOcspEnableDoc = "Specifies whether to enable Online Certificate Status Protocol in the ZooKeeper TLS protocols." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslOcspEnableProp)}</code> system property (note the shorter name)."
/** ********* General Configuration ***********/
val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
@ -943,25 +860,25 @@ object KafkaConfig {
new ConfigDef()
/** ********* Zookeeper Configuration ***********/
.define(ZkConnectProp, STRING, null, HIGH, ZkConnectDoc)
.define(ZkSessionTimeoutMsProp, INT, Defaults.ZK_SESSION_TIMEOUT_MS, HIGH, ZkSessionTimeoutMsDoc)
.define(ZkConnectionTimeoutMsProp, INT, null, HIGH, ZkConnectionTimeoutMsDoc)
.define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZK_ENABLE_SECURE_ACLS, HIGH, ZkEnableSecureAclsDoc)
.define(ZkMaxInFlightRequestsProp, INT, Defaults.ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZkMaxInFlightRequestsDoc)
.define(ZkSslClientEnableProp, BOOLEAN, Defaults.ZK_SSL_CLIENT_ENABLE, MEDIUM, ZkSslClientEnableDoc)
.define(ZkClientCnxnSocketProp, STRING, null, MEDIUM, ZkClientCnxnSocketDoc)
.define(ZkSslKeyStoreLocationProp, STRING, null, MEDIUM, ZkSslKeyStoreLocationDoc)
.define(ZkSslKeyStorePasswordProp, PASSWORD, null, MEDIUM, ZkSslKeyStorePasswordDoc)
.define(ZkSslKeyStoreTypeProp, STRING, null, MEDIUM, ZkSslKeyStoreTypeDoc)
.define(ZkSslTrustStoreLocationProp, STRING, null, MEDIUM, ZkSslTrustStoreLocationDoc)
.define(ZkSslTrustStorePasswordProp, PASSWORD, null, MEDIUM, ZkSslTrustStorePasswordDoc)
.define(ZkSslTrustStoreTypeProp, STRING, null, MEDIUM, ZkSslTrustStoreTypeDoc)
.define(ZkSslProtocolProp, STRING, Defaults.ZK_SSL_PROTOCOL, LOW, ZkSslProtocolDoc)
.define(ZkSslEnabledProtocolsProp, LIST, null, LOW, ZkSslEnabledProtocolsDoc)
.define(ZkSslCipherSuitesProp, LIST, null, LOW, ZkSslCipherSuitesDoc)
.define(ZkSslEndpointIdentificationAlgorithmProp, STRING, Defaults.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, ZkSslEndpointIdentificationAlgorithmDoc)
.define(ZkSslCrlEnableProp, BOOLEAN, Defaults.ZK_SSL_CRL_ENABLE, LOW, ZkSslCrlEnableDoc)
.define(ZkSslOcspEnableProp, BOOLEAN, Defaults.ZK_SSL_OCSP_ENABLE, LOW, ZkSslOcspEnableDoc)
.define(ZkConfigs.ZK_CONNECT_PROP, STRING, null, HIGH, ZkConfigs.ZK_CONNECT_DOC)
.define(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, INT, ZkConfigs.ZK_SESSION_TIMEOUT_MS, HIGH, ZkConfigs.ZK_SESSION_TIMEOUT_MS_DOC)
.define(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, INT, null, HIGH, ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_DOC)
.define(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, BOOLEAN, ZkConfigs.ZK_ENABLE_SECURE_ACLS, HIGH, ZkConfigs.ZK_ENABLE_SECURE_ACLS_DOC)
.define(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP, INT, ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_DOC)
.define(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, BOOLEAN, ZkConfigs.ZK_SSL_CLIENT_ENABLE, MEDIUM, ZkConfigs.ZK_SSL_CLIENT_ENABLE_DOC)
.define(ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_DOC)
.define(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_DOC)
.define(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, PASSWORD, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_DOC)
.define(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_TYPE_DOC)
.define(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_DOC)
.define(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, PASSWORD, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_DOC)
.define(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_DOC)
.define(ZkConfigs.ZK_SSL_PROTOCOL_PROP, STRING, ZkConfigs.ZK_SSL_PROTOCOL, LOW, ZkConfigs.ZK_SSL_PROTOCOL_DOC)
.define(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, LIST, null, LOW, ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_DOC)
.define(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, LIST, null, LOW, ZkConfigs.ZK_SSL_CIPHER_SUITES_DOC)
.define(ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP, STRING, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
.define(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, BOOLEAN, ZkConfigs.ZK_SSL_CRL_ENABLE, LOW, ZkConfigs.ZK_SSL_CRL_ENABLE_DOC)
.define(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, BOOLEAN, ZkConfigs.ZK_SSL_OCSP_ENABLE, LOW, ZkConfigs.ZK_SSL_OCSP_ENABLE_DOC)
/** ********* General Configuration ***********/
.define(BrokerIdGenerationEnableProp, BOOLEAN, Defaults.BROKER_ID_GENERATION_ENABLE, MEDIUM, BrokerIdGenerationEnableDoc)
@ -1417,12 +1334,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
super.valuesWithPrefixOverride(prefix)
/** ********* Zookeeper Configuration ***********/
val zkConnect: String = getString(KafkaConfig.ZkConnectProp)
val zkSessionTimeoutMs: Int = getInt(KafkaConfig.ZkSessionTimeoutMsProp)
val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_PROP)
val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP)
val zkConnectionTimeoutMs: Int =
Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
val zkMaxInFlightRequests: Int = getInt(KafkaConfig.ZkMaxInFlightRequestsProp)
Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP))
val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP)
val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP)
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig
@ -1470,21 +1387,21 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
}
val zkSslClientEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslClientEnableProp)
val zkClientCnxnSocketClassName = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkClientCnxnSocketProp)
val zkSslKeyStoreLocation = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslKeyStoreLocationProp)
val zkSslKeyStorePassword = zkPasswordConfigOrSystemProperty(KafkaConfig.ZkSslKeyStorePasswordProp)
val zkSslKeyStoreType = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslKeyStoreTypeProp)
val zkSslTrustStoreLocation = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslTrustStoreLocationProp)
val zkSslTrustStorePassword = zkPasswordConfigOrSystemProperty(KafkaConfig.ZkSslTrustStorePasswordProp)
val zkSslTrustStoreType = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslTrustStoreTypeProp)
val ZkSslProtocol = zkStringConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslProtocolProp)
val ZkSslEnabledProtocols = zkListConfigOrSystemProperty(KafkaConfig.ZkSslEnabledProtocolsProp)
val ZkSslCipherSuites = zkListConfigOrSystemProperty(KafkaConfig.ZkSslCipherSuitesProp)
val zkSslClientEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP)
val zkClientCnxnSocketClassName = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP)
val zkSslKeyStoreLocation = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP)
val zkSslKeyStorePassword = zkPasswordConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP)
val zkSslKeyStoreType = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP)
val zkSslTrustStoreLocation = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP)
val zkSslTrustStorePassword = zkPasswordConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP)
val zkSslTrustStoreType = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP)
val ZkSslProtocol = zkStringConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_PROTOCOL_PROP)
val ZkSslEnabledProtocols = zkListConfigOrSystemProperty(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP)
val ZkSslCipherSuites = zkListConfigOrSystemProperty(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP)
val ZkSslEndpointIdentificationAlgorithm = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false to HTTPS/<blank>
val kafkaProp = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
val kafkaProp = ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP
val actuallyProvided = originals.containsKey(kafkaProp)
if (actuallyProvided)
getString(kafkaProp)
@ -1496,8 +1413,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
}
}
val ZkSslCrlEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslCrlEnableProp)
val ZkSslOcspEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslOcspEnableProp)
val ZkSslCrlEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP)
val ZkSslOcspEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP)
/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
@ -2040,7 +1957,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
if (requiresZookeeper) {
if (zkConnect == null) {
throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.")
throw new ConfigException(s"Missing required configuration `${ZkConfigs.ZK_CONNECT_PROP}` which has no default value.")
}
if (brokerIdGenerationEnable) {
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id")
@ -2055,7 +1972,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
if (migrationEnabled) {
if (zkConnect == null) {
throw new ConfigException(s"If using `${KafkaConfig.MigrationEnabledProp}` in KRaft mode, `${KafkaConfig.ZkConnectProp}` must also be set.")
throw new ConfigException(s"If using `${KafkaConfig.MigrationEnabledProp}` in KRaft mode, `${ZkConfigs.ZK_CONNECT_PROP}` must also be set.")
}
}
}

View File

@ -57,7 +57,7 @@ import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.server.fault.LoggingFaultHandler
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@ -80,20 +80,20 @@ object KafkaServer {
def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false): ZKClientConfig = {
val clientConfig = new ZKClientConfig
if (config.zkSslClientEnable || forceZkSslClientEnable) {
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslClientEnableProp, "true")
config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkClientCnxnSocketProp, _))
config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreLocationProp, _))
config.zkSslKeyStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStorePasswordProp, x.value))
config.zkSslKeyStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreTypeProp, _))
config.zkSslTrustStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStoreLocationProp, _))
config.zkSslTrustStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStorePasswordProp, x.value))
config.zkSslTrustStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStoreTypeProp, _))
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslProtocolProp, config.ZkSslProtocol)
config.ZkSslEnabledProtocols.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEnabledProtocolsProp, _))
config.ZkSslCipherSuites.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCipherSuitesProp, _))
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp, config.ZkSslEndpointIdentificationAlgorithm)
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCrlEnableProp, config.ZkSslCrlEnable.toString)
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslOcspEnableProp, config.ZkSslOcspEnable.toString)
KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, "true")
config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP, _))
config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, _))
config.zkSslKeyStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, x.value))
config.zkSslKeyStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, _))
config.zkSslTrustStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, _))
config.zkSslTrustStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, x.value))
config.zkSslTrustStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, _))
KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_PROTOCOL_PROP, config.ZkSslProtocol)
config.ZkSslEnabledProtocols.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, _))
config.ZkSslCipherSuites.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, _))
KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP, config.ZkSslEndpointIdentificationAlgorithm)
KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, config.ZkSslCrlEnable.toString)
KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, config.ZkSslOcspEnable.toString)
}
// The zk sasl is enabled by default so it can produce false error when broker does not intend to use SASL.
if (!JaasUtils.isZkSaslEnabled) clientConfig.setProperty(JaasUtils.ZK_SASL_CLIENT, "false")

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
@ -2349,9 +2349,9 @@ object KafkaZkClient {
if (secureAclsEnabled && !isZkSecurityEnabled)
throw new java.lang.SecurityException(
s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but ZooKeeper client TLS configuration identifying at least " +
s"${KafkaConfig.ZkSslClientEnableProp}, ${KafkaConfig.ZkClientCnxnSocketProp}, and " +
s"${KafkaConfig.ZkSslKeyStoreLocationProp} was not present and the verification of the JAAS login file failed " +
s"${ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP} is true, but ZooKeeper client TLS configuration identifying at least " +
s"${ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP}, ${ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP}, and " +
s"${ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP} was not present and the verification of the JAAS login file failed " +
s"${JaasUtils.zkSecuritySysConfigString}")
KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
@ -75,7 +76,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
import DescribeAuthorizedOperationsTest._
override val brokerCount = 1
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
var client: Admin = _

View File

@ -18,9 +18,9 @@
package kafka.api
import com.yammer.metrics.core.Gauge
import java.util.{Collections, Properties}
import java.util.concurrent.ExecutionException
import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
@ -38,11 +38,12 @@ import org.apache.kafka.common.resource._
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.security.auth._
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{ValueSource, CsvSource}
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import scala.jdk.CollectionConverters._
@ -156,7 +157,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
} else {
// The next two configuration parameters enable ZooKeeper secure ACLs
// and sets the Kafka authorizer, both necessary to enable security.
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName)
// Set the specific principal that can update ACLs.

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPart
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.TestJaasConfig
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@ -43,7 +44,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
private val kafkaServerJaasEntryName =
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "false")
this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
this.serverConfig.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "2.8")
this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")

View File

@ -44,7 +44,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.server.config.{Defaults, ZkConfigs}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
@ -2707,7 +2707,7 @@ object PlaintextAdminIntegrationTest {
var topicConfigEntries2 = Seq(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")).asJava
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, test.brokers.head.config.brokerId.toString)
val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.ZkConnectProp, "localhost:2181")).asJava
val brokerConfigEntries = Seq(new ConfigEntry(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")).asJava
// Alter configs: first and third are invalid, second is valid
var alterResult = admin.alterConfigs(Map(

View File

@ -12,10 +12,10 @@
*/
package kafka.api
import kafka.server.KafkaConfig
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import kafka.utils.{JaasTestUtils, TestInfoUtils, TestUtils}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
private val kafkaClientSaslMechanism = "PLAIN"
private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN")
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))

View File

@ -13,10 +13,10 @@
package kafka.api
import java.util.Locale
import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
@Timeout(600)
@ -26,7 +26,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
private val kafkaServerJaasEntryName =
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "false")
// disable secure acls of zkClient in QuorumTestHarness
override protected def zkAclsEnabled = Some(false)
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.storage.internals.log.LogConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
@ -46,7 +47,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], classOf[AclAuthorizer])
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))

View File

@ -12,14 +12,14 @@
*/
package kafka.api
import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
@Timeout(600)
class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))

View File

@ -14,10 +14,8 @@ package kafka.api
import java.util
import java.util.concurrent._
import com.yammer.metrics.core.Gauge
import kafka.security.authorizer.AclAuthorizer
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateAclsResult}
import org.apache.kafka.common.acl._
@ -25,6 +23,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, Test}
@ -80,7 +79,7 @@ object SslAdminIntegrationTest {
class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
override val authorizationAdmin = new AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], classOf[AclAuthorizer])
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
override protected def securityProtocol = SecurityProtocol.SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))

View File

@ -60,7 +60,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.ShutdownableThread
@ -122,7 +122,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
properties
} else {
val properties = TestUtils.createBrokerConfig(brokerId, zkConnect)
properties.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
properties.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
properties
}
props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)

View File

@ -25,6 +25,7 @@ import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.Uuid
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions.{assertThrows, fail}
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{Tag, Timeout}
@ -61,7 +62,7 @@ class KafkaServerKRaftRegistrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@ -99,7 +100,7 @@ class KafkaServerKRaftRegistrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.coordinator.group.OffsetConfig
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
@ -79,7 +80,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$Internal:PLAINTEXT, $SecureInternal:SASL_SSL," +
s"$External:PLAINTEXT, $SecureExternal:SASL_SSL")
props.put(KafkaConfig.InterBrokerListenerNameProp, Internal)
props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
props.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, kafkaClientSaslMechanism)
props.put(s"${new ListenerName(SecureInternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}",
kafkaServerSaslMechanisms(SecureInternal).mkString(","))

View File

@ -48,7 +48,7 @@ import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail}
import org.junit.jupiter.api.{Assumptions, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
@ -177,7 +177,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@ -309,7 +309,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@ -443,7 +443,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@ -508,7 +508,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@ -576,7 +576,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@ -636,7 +636,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@ -711,7 +711,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
.setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()

View File

@ -25,6 +25,7 @@ import kafka.utils.TestUtils.assertBadConfigContainingMessage
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
@ -155,7 +156,7 @@ class KafkaTest {
"Missing required configuration `zookeeper.connect` which has no default value.")
// Ensure that no exception is thrown once zookeeper.connect is defined (and we clear controller.listener.names)
propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
propertiesFile.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
propertiesFile.setProperty(KafkaConfig.ControllerListenerNamesProp, "")
KafkaConfig.fromProps(propertiesFile)
}
@ -232,61 +233,61 @@ class KafkaTest {
@Test
def testZkSslClientEnable(): Unit = {
testZkConfig(KafkaConfig.ZkSslClientEnableProp, "zookeeper.ssl.client.enable",
testZkConfig(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, "zookeeper.ssl.client.enable",
"zookeeper.client.secure", booleanPropValueToSet, config => Some(config.zkSslClientEnable), booleanPropValueToSet, Some(false))
}
@Test
def testZkSslKeyStoreLocation(): Unit = {
testZkConfig(KafkaConfig.ZkSslKeyStoreLocationProp, "zookeeper.ssl.keystore.location",
testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, "zookeeper.ssl.keystore.location",
"zookeeper.ssl.keyStore.location", stringPropValueToSet, config => config.zkSslKeyStoreLocation, stringPropValueToSet)
}
@Test
def testZkSslTrustStoreLocation(): Unit = {
testZkConfig(KafkaConfig.ZkSslTrustStoreLocationProp, "zookeeper.ssl.truststore.location",
testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, "zookeeper.ssl.truststore.location",
"zookeeper.ssl.trustStore.location", stringPropValueToSet, config => config.zkSslTrustStoreLocation, stringPropValueToSet)
}
@Test
def testZookeeperKeyStorePassword(): Unit = {
testZkConfig(KafkaConfig.ZkSslKeyStorePasswordProp, "zookeeper.ssl.keystore.password",
testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, "zookeeper.ssl.keystore.password",
"zookeeper.ssl.keyStore.password", passwordPropValueToSet, config => config.zkSslKeyStorePassword, new Password(passwordPropValueToSet))
}
@Test
def testZookeeperTrustStorePassword(): Unit = {
testZkConfig(KafkaConfig.ZkSslTrustStorePasswordProp, "zookeeper.ssl.truststore.password",
testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, "zookeeper.ssl.truststore.password",
"zookeeper.ssl.trustStore.password", passwordPropValueToSet, config => config.zkSslTrustStorePassword, new Password(passwordPropValueToSet))
}
@Test
def testZkSslKeyStoreType(): Unit = {
testZkConfig(KafkaConfig.ZkSslKeyStoreTypeProp, "zookeeper.ssl.keystore.type",
testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, "zookeeper.ssl.keystore.type",
"zookeeper.ssl.keyStore.type", stringPropValueToSet, config => config.zkSslKeyStoreType, stringPropValueToSet)
}
@Test
def testZkSslTrustStoreType(): Unit = {
testZkConfig(KafkaConfig.ZkSslTrustStoreTypeProp, "zookeeper.ssl.truststore.type",
testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, "zookeeper.ssl.truststore.type",
"zookeeper.ssl.trustStore.type", stringPropValueToSet, config => config.zkSslTrustStoreType, stringPropValueToSet)
}
@Test
def testZkSslProtocol(): Unit = {
testZkConfig(KafkaConfig.ZkSslProtocolProp, "zookeeper.ssl.protocol",
testZkConfig(ZkConfigs.ZK_SSL_PROTOCOL_PROP, "zookeeper.ssl.protocol",
"zookeeper.ssl.protocol", stringPropValueToSet, config => Some(config.ZkSslProtocol), stringPropValueToSet, Some("TLSv1.2"))
}
@Test
def testZkSslEnabledProtocols(): Unit = {
testZkConfig(KafkaConfig.ZkSslEnabledProtocolsProp, "zookeeper.ssl.enabled.protocols",
testZkConfig(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, "zookeeper.ssl.enabled.protocols",
"zookeeper.ssl.enabledProtocols", listPropValueToSet.mkString(","), config => config.ZkSslEnabledProtocols, listPropValueToSet.asJava)
}
@Test
def testZkSslCipherSuites(): Unit = {
testZkConfig(KafkaConfig.ZkSslCipherSuitesProp, "zookeeper.ssl.cipher.suites",
testZkConfig(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, "zookeeper.ssl.cipher.suites",
"zookeeper.ssl.ciphersuites", listPropValueToSet.mkString(","), config => config.ZkSslCipherSuites, listPropValueToSet.asJava)
}
@ -294,7 +295,7 @@ class KafkaTest {
def testZkSslEndpointIdentificationAlgorithm(): Unit = {
// this property is different than the others
// because the system property values and the Kafka property values don't match
val kafkaPropName = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
val kafkaPropName = ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP
assertEquals("zookeeper.ssl.endpoint.identification.algorithm", kafkaPropName)
val sysProp = "zookeeper.ssl.hostnameVerification"
val expectedDefaultValue = "HTTPS"
@ -327,13 +328,13 @@ class KafkaTest {
@Test
def testZkSslCrlEnable(): Unit = {
testZkConfig(KafkaConfig.ZkSslCrlEnableProp, "zookeeper.ssl.crl.enable",
testZkConfig(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, "zookeeper.ssl.crl.enable",
"zookeeper.ssl.crl", booleanPropValueToSet, config => Some(config.ZkSslCrlEnable), booleanPropValueToSet, Some(false))
}
@Test
def testZkSslOcspEnable(): Unit = {
testZkConfig(KafkaConfig.ZkSslOcspEnableProp, "zookeeper.ssl.ocsp.enable",
testZkConfig(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, "zookeeper.ssl.ocsp.enable",
"zookeeper.ssl.ocsp", booleanPropValueToSet, config => Some(config.ZkSslOcspEnable), booleanPropValueToSet, Some(false))
}

View File

@ -34,6 +34,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_0_IV1, IBP_0_10_2_IV0, IBP_0_9_0, IBP_1_0_IV0, IBP_2_2_IV0, IBP_2_4_IV0, IBP_2_4_IV1, IBP_2_6_IV0, IBP_2_8_IV1, IBP_3_2_IV0, IBP_3_4_IV0}
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -895,7 +896,7 @@ class ControllerChannelManagerTest {
private def createConfig(interBrokerVersion: MetadataVersion): KafkaConfig = {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, controllerId.toString)
props.put(KafkaConfig.ZkConnectProp, "zkConnect")
props.put(ZkConfigs.ZK_CONNECT_PROP, "zkConnect")
TestUtils.setIbpAndMessageFormatVersions(props, interBrokerVersion)
KafkaConfig.fromProps(props)
}

View File

@ -41,6 +41,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_2_0_IV0, IBP_2_0_IV1}
import org.apache.kafka.server.config.ZkConfigs
import org.apache.zookeeper.client.ZKClientConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
@ -869,7 +870,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(noTlsProps),
noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala)
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName =>
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach { propName =>
assertNull(zkClientConfig.getProperty(propName))
}
}
@ -879,27 +880,27 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val props = new java.util.Properties()
val kafkaValue = "kafkaValue"
val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it
KafkaConfig.ZkSslClientEnableProp -> "true",
KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue)
ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true",
ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue)
configs.foreach { case (key, value) => props.put(key, value) }
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
// confirm we get all the values we expect
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop => prop match {
case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP =>
assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP =>
assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslProtocolProp =>
case ZkConfigs.ZK_SSL_PROTOCOL_PROP =>
assertEquals("TLSv1.2", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
})
@ -910,29 +911,29 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val props = new java.util.Properties()
val kafkaValue = "kafkaValue"
val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it
KafkaConfig.ZkSslClientEnableProp -> "true",
KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslProtocolProp -> kafkaValue,
KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue,
KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "HTTPS",
KafkaConfig.ZkSslCrlEnableProp -> "false",
KafkaConfig.ZkSslOcspEnableProp -> "false")
ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true",
ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_PROTOCOL_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "HTTPS",
ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "false",
ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "false")
configs.foreach{case (key, value) => props.put(key, value.toString) }
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
// confirm we get all the values we expect
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop => prop match {
case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP =>
assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP =>
assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
})
@ -945,43 +946,43 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val prefixedValue = "prefixedValue"
val prefix = "authorizer."
val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it
KafkaConfig.ZkSslClientEnableProp -> "false",
KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslProtocolProp -> kafkaValue,
KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue,
KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "HTTPS",
KafkaConfig.ZkSslCrlEnableProp -> "false",
KafkaConfig.ZkSslOcspEnableProp -> "false",
prefix + KafkaConfig.ZkSslClientEnableProp -> "true",
prefix + KafkaConfig.ZkClientCnxnSocketProp -> prefixedValue,
prefix + KafkaConfig.ZkSslKeyStoreLocationProp -> prefixedValue,
prefix + KafkaConfig.ZkSslKeyStorePasswordProp -> prefixedValue,
prefix + KafkaConfig.ZkSslKeyStoreTypeProp -> prefixedValue,
prefix + KafkaConfig.ZkSslTrustStoreLocationProp -> prefixedValue,
prefix + KafkaConfig.ZkSslTrustStorePasswordProp -> prefixedValue,
prefix + KafkaConfig.ZkSslTrustStoreTypeProp -> prefixedValue,
prefix + KafkaConfig.ZkSslProtocolProp -> prefixedValue,
prefix + KafkaConfig.ZkSslEnabledProtocolsProp -> prefixedValue,
prefix + KafkaConfig.ZkSslCipherSuitesProp -> prefixedValue,
prefix + KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "",
prefix + KafkaConfig.ZkSslCrlEnableProp -> "true",
prefix + KafkaConfig.ZkSslOcspEnableProp -> "true")
ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "false",
ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_PROTOCOL_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue,
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "HTTPS",
ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "false",
ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "false",
prefix + ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true",
prefix + ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> prefixedValue,
prefix + ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> prefixedValue,
prefix + ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> prefixedValue,
prefix + ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> prefixedValue,
prefix + ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> prefixedValue,
prefix + ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> prefixedValue,
prefix + ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> prefixedValue,
prefix + ZkConfigs.ZK_SSL_PROTOCOL_PROP -> prefixedValue,
prefix + ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> prefixedValue,
prefix + ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> prefixedValue,
prefix + ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "",
prefix + ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "true",
prefix + ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "true")
configs.foreach{case (key, value) => props.put(key, value.toString) }
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
// confirm we get all the values we expect
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop => prop match {
case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP =>
assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP =>
assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
case _ => assertEquals(prefixedValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
})

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.config.{ConfigException, SslConfigs}
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.server.config.{Defaults, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler
@ -211,7 +211,7 @@ class DynamicBrokerConfigTest {
val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix)
val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
val nonDynamicProps = Map(ZkConfigs.ZK_CONNECT_PROP -> "somehost:2181")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps)
// Test update of configs with invalid type
@ -709,7 +709,7 @@ class DynamicBrokerConfigTest {
@Test
def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = {
val props = new Properties()
props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
props.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.put(KafkaConfig.MetadataLogSegmentMinBytesProp, "1024")
val config = new KafkaConfig(props)
assertFalse(config.nonInternalValues.containsKey(KafkaConfig.MetadataLogSegmentMinBytesProp))

View File

@ -38,7 +38,7 @@ import org.apache.kafka.common.Node
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.config.{ServerTopicConfigSynonyms, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.function.Executable
@ -155,7 +155,7 @@ class KafkaConfigTest {
val hostName = "fake-host"
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")
val serverConfig = KafkaConfig.fromProps(props)
@ -186,7 +186,7 @@ class KafkaConfigTest {
def testDuplicateListeners(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
// listeners with duplicate port
props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
@ -212,7 +212,7 @@ class KafkaConfigTest {
def testIPv4AndIPv6SamePortListeners(): Unit = {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, "1")
props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
props.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://[::1]:9092,SSL://[::1]:9092")
var caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
@ -452,7 +452,7 @@ class KafkaConfigTest {
def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092")
assertBadConfigContainingMessage(props,
"Error creating broker listeners from 'CONTROLLER://localhost:9092': No security protocol defined for listener CONTROLLER")
@ -465,7 +465,7 @@ class KafkaConfigTest {
def testBadListenerProtocol(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "BAD://localhost:9091")
assertFalse(isValidKafkaConfig(props))
@ -475,7 +475,7 @@ class KafkaConfigTest {
def testListenerNamesWithAdvertisedListenerUnset(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
@ -499,7 +499,7 @@ class KafkaConfigTest {
def testListenerAndAdvertisedListenerNames(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
props.setProperty(KafkaConfig.AdvertisedListenersProp, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
@ -530,7 +530,7 @@ class KafkaConfigTest {
def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091,REPLICATION://localhost:9092")
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
@ -541,7 +541,7 @@ class KafkaConfigTest {
def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
@ -552,7 +552,7 @@ class KafkaConfigTest {
def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
@ -564,7 +564,7 @@ class KafkaConfigTest {
def testCaseInsensitiveListenerProtocol(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092")
val config = KafkaConfig.fromProps(props)
assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString))
@ -579,7 +579,7 @@ class KafkaConfigTest {
def testListenerDefaults(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
// configuration with no listeners
val conf = KafkaConfig.fromProps(props)
@ -593,7 +593,7 @@ class KafkaConfigTest {
def testVersionConfiguration(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
val conf = KafkaConfig.fromProps(props)
assertEquals(MetadataVersion.latestProduction, conf.interBrokerProtocolVersion)
@ -766,7 +766,7 @@ class KafkaConfigTest {
def testFromPropsInvalid(): Unit = {
def baseProperties: Properties = {
val validRequiredProperties = new Properties()
validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181")
validRequiredProperties
}
// to ensure a basis is valid - bootstraps all needed validation
@ -774,25 +774,25 @@ class KafkaConfigTest {
KafkaConfig.configNames.foreach { name =>
name match {
case KafkaConfig.ZkConnectProp => // ignore string
case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.ZkSslClientEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case KafkaConfig.ZkClientCnxnSocketProp => //ignore string
case KafkaConfig.ZkSslKeyStoreLocationProp => //ignore string
case KafkaConfig.ZkSslKeyStorePasswordProp => //ignore string
case KafkaConfig.ZkSslKeyStoreTypeProp => //ignore string
case KafkaConfig.ZkSslTrustStoreLocationProp => //ignore string
case KafkaConfig.ZkSslTrustStorePasswordProp => //ignore string
case KafkaConfig.ZkSslTrustStoreTypeProp => //ignore string
case KafkaConfig.ZkSslProtocolProp => //ignore string
case KafkaConfig.ZkSslEnabledProtocolsProp => //ignore string
case KafkaConfig.ZkSslCipherSuitesProp => //ignore string
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => //ignore string
case KafkaConfig.ZkSslCrlEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case KafkaConfig.ZkSslOcspEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case ZkConfigs.ZK_CONNECT_PROP => // ignore string
case ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP => //ignore string
case ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP => //ignore string
case ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP => //ignore string
case ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP => //ignore string
case ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP => //ignore string
case ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP => //ignore string
case ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP => //ignore string
case ZkConfigs.ZK_SSL_PROTOCOL_PROP => //ignore string
case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP => //ignore string
case ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => //ignore string
case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => //ignore string
case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case KafkaConfig.BrokerIdProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
@ -1051,7 +1051,7 @@ class KafkaConfigTest {
def testDynamicLogConfigs(): Unit = {
def baseProperties: Properties = {
val validRequiredProperties = new Properties()
validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181")
validRequiredProperties
}
@ -1141,9 +1141,9 @@ class KafkaConfigTest {
@Test
def testSpecificProperties(): Unit = {
val defaults = new Properties()
defaults.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
defaults.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181")
// For ZkConnectionTimeoutMs
defaults.setProperty(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, "1234")
defaults.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
defaults.setProperty(KafkaConfig.MaxReservedBrokerIdProp, "1")
defaults.setProperty(KafkaConfig.BrokerIdProp, "1")
@ -1187,7 +1187,7 @@ class KafkaConfigTest {
@Test
def testNonroutableAdvertisedListeners(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181")
props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
assertFalse(isValidKafkaConfig(props))
}
@ -1601,7 +1601,7 @@ class KafkaConfigTest {
@Test
def testSaslJwksEndpointRetryDefaults(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
val config = KafkaConfig.fromProps(props)
assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMsProp))
assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp))
@ -1769,7 +1769,7 @@ class KafkaConfigTest {
"If using `zookeeper.metadata.migration.enable` in KRaft mode, `zookeeper.connect` must also be set.",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage)
props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
KafkaConfig.fromProps(props)
}

View File

@ -25,8 +25,11 @@ import org.junit.jupiter.api.Test
import java.util.Properties
import java.net.{InetAddress, ServerSocket}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import scala.jdk.CollectionConverters._
class KafkaServerTest extends QuorumTestHarness {
@Test
@ -64,7 +67,7 @@ class KafkaServerTest extends QuorumTestHarness {
@Test
def testCreatesProperZkConfigWhenSaslDisabled(): Unit = {
val props = new Properties
props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
assertEquals("false", zkClientConfig.getProperty(JaasUtils.ZK_SASL_CLIENT))
}
@ -72,10 +75,10 @@ class KafkaServerTest extends QuorumTestHarness {
@Test
def testCreatesProperZkTlsConfigWhenDisabled(): Unit = {
val props = new Properties
props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
props.put(KafkaConfig.ZkSslClientEnableProp, "false")
props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out
props.put(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, "false")
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName =>
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach { propName =>
assertNull(zkClientConfig.getProperty(propName))
}
}
@ -83,51 +86,51 @@ class KafkaServerTest extends QuorumTestHarness {
@Test
def testCreatesProperZkTlsConfigWithTrueValues(): Unit = {
val props = new Properties
props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out
// should get correct config for all properties if TLS is enabled
val someValue = "some_value"
def kafkaConfigValueToSet(kafkaProp: String) : String = kafkaProp match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "true"
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "HTTPS"
case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "true"
case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "HTTPS"
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
// now check to make sure the values were set correctly
def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "true"
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "true"
case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "true"
case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "true"
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp =>
assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaProp))))
}
@Test
def testCreatesProperZkTlsConfigWithFalseAndListValues(): Unit = {
val props = new Properties
props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out
// should get correct config for all properties if TLS is enabled
val someValue = "some_value"
def kafkaConfigValueToSet(kafkaProp: String) : String = kafkaProp match {
case KafkaConfig.ZkSslClientEnableProp => "true"
case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "false"
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => ""
case KafkaConfig.ZkSslEnabledProtocolsProp | KafkaConfig.ZkSslCipherSuitesProp => "A,B"
case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => "true"
case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "false"
case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => ""
case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => "A,B"
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
// now check to make sure the values were set correctly
def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
case KafkaConfig.ZkSslClientEnableProp => "true"
case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "false"
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "false"
case KafkaConfig.ZkSslEnabledProtocolsProp | KafkaConfig.ZkSslCipherSuitesProp => "A,B"
case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => "true"
case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "false"
case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "false"
case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => "A,B"
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp =>
assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaProp))))
}
@Test

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
@ -150,8 +151,8 @@ class ServerShutdownTest extends KafkaServerTestHarness {
shutdownKRaftController()
verifyCleanShutdownAfterFailedStartup[CancellationException]
} else {
propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50")
propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535")
propsToChangeUponRestart.setProperty(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, "50")
propsToChangeUponRestart.setProperty(ZkConfigs.ZK_CONNECT_PROP, "some.invalid.hostname.foo.bar.local:65535")
verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException]
}
}

View File

@ -20,6 +20,7 @@ import java.util.Properties
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metrics.MetricsContext
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -54,7 +55,7 @@ class ServerTest {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, brokerId.toString)
props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:0")
props.put(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:0")
val config = KafkaConfig.fromProps(props)
val context = Server.createKafkaMetricsContext(config, clusterId)

View File

@ -74,7 +74,7 @@ import org.apache.kafka.metadata.properties.MetaProperties
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.server.config.{Defaults, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
@ -359,8 +359,8 @@ object TestUtils extends Logging {
// controllerQuorumVotersFuture instead.
props.put(KafkaConfig.QuorumVotersProp, "1000@localhost:0")
} else {
props.put(KafkaConfig.ZkConnectProp, zkConnect)
props.put(KafkaConfig.ZkConnectionTimeoutMsProp, "10000")
props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect)
props.put(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, "10000")
}
props.put(KafkaConfig.ReplicaSocketTimeoutMsProp, "1500")
props.put(KafkaConfig.ControllerSocketTimeoutMsProp, "1500")

View File

@ -44,7 +44,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.{Code, NoAuthException, NoNodeException, NodeExistsException}
import org.apache.zookeeper.{CreateMode, ZooDefs}
@ -106,7 +106,7 @@ class KafkaZkClientTest extends QuorumTestHarness {
// TLS connectivity itself is tested in system tests rather than here to avoid having to add TLS support
// to kafka.zk.EmbeddedZookeeper
val clientConfig = new ZKClientConfig()
val propKey = KafkaConfig.ZkClientCnxnSocketProp
val propKey = ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP
val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,

View File

@ -21,6 +21,7 @@ import kafka.zk.ZkMigrationClient
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import java.util.Properties
@ -40,7 +41,7 @@ class ZkMigrationTestHarness extends QuorumTestHarness {
val encoder: PasswordEncoder = {
val encoderProps = new Properties()
encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get around the config validation
encoderProps.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:1234") // Get around the config validation
encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk secret to encrypt the
val encoderConfig = new KafkaConfig(encoderProps)
PasswordEncoder.encrypting(encoderConfig.passwordEncoderSecret.get,

View File

@ -27,6 +27,7 @@ import kafka.utils.TestUtils
import kafka.server.QuorumTestHarness
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
@ -102,7 +103,7 @@ class ZooKeeperClientTest extends QuorumTestHarness {
// TLS connectivity itself is tested in system tests rather than here to avoid having to add TLS support
// to kafka.zk.EmbeddedZookeeper
val clientConfig = new ZKClientConfig()
val propKey = KafkaConfig.ZkClientCnxnSocketProp
val propKey = ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP
val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
val client = newZooKeeperClient(clientConfig = clientConfig)

View File

@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ZkConfigs;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@ -176,7 +177,7 @@ public class MetadataRequestBenchmark {
private KafkaApis createKafkaApis() {
Properties kafkaProps = new Properties();
kafkaProps.put(KafkaConfig$.MODULE$.ZkConnectProp(), "zk");
kafkaProps.put(ZkConfigs.ZK_CONNECT_PROP, "zk");
kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + "");
KafkaConfig config = new KafkaConfig(kafkaProps);
return new KafkaApisBuilder().

View File

@ -44,16 +44,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class Defaults {
/** ********* Zookeeper Configuration *********/
public static final int ZK_SESSION_TIMEOUT_MS = 18000;
public static final boolean ZK_ENABLE_SECURE_ACLS = false;
public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10;
public static final boolean ZK_SSL_CLIENT_ENABLE = false;
public static final String ZK_SSL_PROTOCOL = "TLSv1.2";
public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS";
public static final boolean ZK_SSL_CRL_ENABLE = false;
public static final boolean ZK_SSL_OCSP_ENABLE = false;
/** ********* General Configuration *********/
public static final boolean BROKER_ID_GENERATION_ENABLE = true;
public static final int MAX_RESERVED_BROKER_ID = 1000;

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.config;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
public final class ZkConfigs {
/** ********* Zookeeper Configuration ***********/
public static final String ZK_CONNECT_PROP = "zookeeper.connect";
public static final String ZK_SESSION_TIMEOUT_MS_PROP = "zookeeper.session.timeout.ms";
public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = "zookeeper.connection.timeout.ms";
public static final String ZK_ENABLE_SECURE_ACLS_PROP = "zookeeper.set.acl";
public static final String ZK_MAX_IN_FLIGHT_REQUESTS_PROP = "zookeeper.max.in.flight.requests";
public static final String ZK_SSL_CLIENT_ENABLE_PROP = "zookeeper.ssl.client.enable";
public static final String ZK_CLIENT_CNXN_SOCKET_PROP = "zookeeper.clientCnxnSocket";
public static final String ZK_SSL_KEY_STORE_LOCATION_PROP = "zookeeper.ssl.keystore.location";
public static final String ZK_SSL_KEY_STORE_PASSWORD_PROP = "zookeeper.ssl.keystore.password";
public static final String ZK_SSL_KEY_STORE_TYPE_PROP = "zookeeper.ssl.keystore.type";
public static final String ZK_SSL_TRUST_STORE_LOCATION_PROP = "zookeeper.ssl.truststore.location";
public static final String ZK_SSL_TRUST_STORE_PASSWORD_PROP = "zookeeper.ssl.truststore.password";
public static final String ZK_SSL_TRUST_STORE_TYPE_PROP = "zookeeper.ssl.truststore.type";
public static final String ZK_SSL_PROTOCOL_PROP = "zookeeper.ssl.protocol";
public static final String ZK_SSL_ENABLED_PROTOCOLS_PROP = "zookeeper.ssl.enabled.protocols";
public static final String ZK_SSL_CIPHER_SUITES_PROP = "zookeeper.ssl.cipher.suites";
public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP = "zookeeper.ssl.endpoint.identification.algorithm";
public static final String ZK_SSL_CRL_ENABLE_PROP = "zookeeper.ssl.crl.enable";
public static final String ZK_SSL_OCSP_ENABLE_PROP = "zookeeper.ssl.ocsp.enable";
public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the " +
"host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is " +
"down you can also specify multiple hosts in the form <code>hostname1:port1,hostname2:port2,hostname3:port3</code>.\n" +
"The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " +
"For example to give a chroot path of <code>/chroot/path</code> you would give the connection string as <code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code>.";
public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session timeout";
public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZK_SESSION_TIMEOUT_MS_PROP + " is used";
public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use secure ACLs";
public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking.";
public static final String ZK_SSL_CLIENT_ENABLE_DOC;
public static final String ZK_CLIENT_CNXN_SOCKET_DOC;
public static final String ZK_SSL_KEY_STORE_LOCATION_DOC;
public static final String ZK_SSL_KEY_STORE_PASSWORD_DOC;
public static final String ZK_SSL_KEY_STORE_TYPE_DOC;
public static final String ZK_SSL_TRUST_STORE_LOCATION_DOC;
public static final String ZK_SSL_TRUST_STORE_PASSWORD_DOC;
public static final String ZK_SSL_TRUST_STORE_TYPE_DOC;
public static final String ZK_SSL_PROTOCOL_DOC;
public static final String ZK_SSL_ENABLED_PROTOCOLS_DOC;
public static final String ZK_SSL_CIPHER_SUITES_DOC;
public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC;
public static final String ZK_SSL_CRL_ENABLE_DOC;
public static final String ZK_SSL_OCSP_ENABLE_DOC;
// a map from the Kafka config to the corresponding ZooKeeper Java system property
public static final Map<String, String> ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP;
public static final int ZK_SESSION_TIMEOUT_MS = 18000;
public static final boolean ZK_ENABLE_SECURE_ACLS = false;
public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10;
public static final boolean ZK_SSL_CLIENT_ENABLE = false;
public static final String ZK_SSL_PROTOCOL = "TLSv1.2";
public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS";
public static final boolean ZK_SSL_CRL_ENABLE = false;
public static final boolean ZK_SSL_OCSP_ENABLE = false;
// See ZKClientConfig.SECURE_CLIENT
private static final String SECURE_CLIENT = "zookeeper.client.secure";
// See ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET
private static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
static {
Map<String, String> zkSslConfigToSystemPropertyMap = new HashMap<>();
zkSslConfigToSystemPropertyMap.put(ZK_SSL_CLIENT_ENABLE_PROP, SECURE_CLIENT);
zkSslConfigToSystemPropertyMap.put(ZK_CLIENT_CNXN_SOCKET_PROP, ZOOKEEPER_CLIENT_CNXN_SOCKET);
zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_LOCATION_PROP, "zookeeper.ssl.keyStore.location");
zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_PASSWORD_PROP, "zookeeper.ssl.keyStore.password");
zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_TYPE_PROP, "zookeeper.ssl.keyStore.type");
zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_LOCATION_PROP, "zookeeper.ssl.trustStore.location");
zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_PASSWORD_PROP, "zookeeper.ssl.trustStore.password");
zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_TYPE_PROP, "zookeeper.ssl.trustStore.type");
zkSslConfigToSystemPropertyMap.put(ZK_SSL_PROTOCOL_PROP, "zookeeper.ssl.protocol");
zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENABLED_PROTOCOLS_PROP, "zookeeper.ssl.enabledProtocols");
zkSslConfigToSystemPropertyMap.put(ZK_SSL_CIPHER_SUITES_PROP, "zookeeper.ssl.ciphersuites");
zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP, "zookeeper.ssl.hostnameVerification");
zkSslConfigToSystemPropertyMap.put(ZK_SSL_CRL_ENABLE_PROP, "zookeeper.ssl.crl");
zkSslConfigToSystemPropertyMap.put(ZK_SSL_OCSP_ENABLE_PROP, "zookeeper.ssl.ocsp");
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP = Collections.unmodifiableMap(zkSslConfigToSystemPropertyMap);
ZK_SSL_CLIENT_ENABLE_DOC = "Set client to use TLS when connecting to ZooKeeper." +
" An explicit value overrides any value set via the <code>zookeeper.client.secure</code> system property (note the different name)." +
" Defaults to false if neither is set; when true, <code>" + ZK_CLIENT_CNXN_SOCKET_PROP + "</code> must be set (typically to <code>org.apache.zookeeper.ClientCnxnSocketNetty</code>); other values to set may include " +
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.keySet().stream().filter(x -> !x.equals(ZK_SSL_CLIENT_ENABLE_PROP) && !x.equals(ZK_CLIENT_CNXN_SOCKET_PROP)).sorted().collect(Collectors.joining("<code>", "</code>, <code>", "</code>"));
ZK_CLIENT_CNXN_SOCKET_DOC = "Typically set to <code>org.apache.zookeeper.ClientCnxnSocketNetty</code> when using TLS connectivity to ZooKeeper." +
" Overrides any explicit value set via the same-named <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_CLIENT_CNXN_SOCKET_PROP) + "</code> system property.";
ZK_SSL_KEY_STORE_LOCATION_DOC = "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper." +
" Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_LOCATION_PROP) + "</code> system property (note the camelCase).";
ZK_SSL_KEY_STORE_PASSWORD_DOC = "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
" Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_PASSWORD_PROP) + "</code> system property (note the camelCase)." +
" Note that ZooKeeper does not support a key password different from the keystore password, so be sure to set the key password in the keystore to be identical to the keystore password; otherwise the connection attempt to Zookeeper will fail.";
ZK_SSL_KEY_STORE_TYPE_DOC = "Keystore type when using a client-side certificate with TLS connectivity to ZooKeeper." +
" Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_TYPE_PROP) + "</code> system property (note the camelCase)." +
" The default value of <code>null</code> means the type will be auto-detected based on the filename extension of the keystore.";
ZK_SSL_TRUST_STORE_LOCATION_DOC = "Truststore location when using TLS connectivity to ZooKeeper." +
" Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_LOCATION_PROP) + "</code> system property (note the camelCase).";
ZK_SSL_TRUST_STORE_PASSWORD_DOC = "Truststore password when using TLS connectivity to ZooKeeper." +
" Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_PASSWORD_PROP) + "</code> system property (note the camelCase).";
ZK_SSL_TRUST_STORE_TYPE_DOC = "Truststore type when using TLS connectivity to ZooKeeper." +
" Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_TYPE_PROP) + "</code> system property (note the camelCase)." +
" The default value of <code>null</code> means the type will be auto-detected based on the filename extension of the truststore.";
ZK_SSL_PROTOCOL_DOC = "Specifies the protocol to be used in ZooKeeper TLS negotiation." +
" An explicit value overrides any value set via the same-named <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_PROTOCOL_PROP) + "</code> system property.";
ZK_SSL_ENABLED_PROTOCOLS_DOC = "Specifies the enabled protocol(s) in ZooKeeper TLS negotiation (csv)." +
" Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_ENABLED_PROTOCOLS_PROP) + "</code> system property (note the camelCase)." +
" The default value of <code>null</code> means the enabled protocol will be the value of the <code>" + ZK_SSL_PROTOCOL_PROP + "</code> configuration property.";
ZK_SSL_CIPHER_SUITES_DOC = "Specifies the enabled cipher suites to be used in ZooKeeper TLS negotiation (csv)." +
" Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_CIPHER_SUITES_PROP) + "</code> system property (note the single word \"ciphersuites\")." +
" The default value of <code>null</code> means the list of enabled cipher suites is determined by the Java runtime being used.";
ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "Specifies whether to enable hostname verification in the ZooKeeper TLS negotiation process, with (case-insensitively) \"https\" meaning ZooKeeper hostname verification is enabled and an explicit blank value meaning it is disabled (disabling it is only recommended for testing purposes)." +
" An explicit value overrides any \"true\" or \"false\" value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP) + "</code> system property (note the different name and values; true implies https and false implies blank).";
ZK_SSL_CRL_ENABLE_DOC = "Specifies whether to enable Certificate Revocation List in the ZooKeeper TLS protocols." +
" Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_CRL_ENABLE_PROP) + "</code> system property (note the shorter name).";
ZK_SSL_OCSP_ENABLE_DOC = "Specifies whether to enable Online Certificate Status Protocol in the ZooKeeper TLS protocols." +
" Overrides any explicit value set via the <code>" + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_OCSP_ENABLE_PROP) + "</code> system property (note the shorter name).";
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.server.config.ConfigType;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.test.TestCondition;
@ -109,7 +110,7 @@ public class EmbeddedKafkaCluster {
zookeeper = new EmbeddedZookeeper();
log.debug("ZooKeeper instance is running at {}", zKConnectString());
brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
brokerConfig.put(ZkConfigs.ZK_CONNECT_PROP, zKConnectString());
putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);
putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.server.util.MockTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -90,7 +91,7 @@ public class KafkaEmbedded {
effectiveConfig.put(KafkaConfig.AutoCreateTopicsEnableProp(), true);
effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 1000000);
effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), true);
effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), 10000);
effectiveConfig.put(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, 10000);
effectiveConfig.putAll(initialConfig);
effectiveConfig.setProperty(KafkaConfig.LogDirProp(), logDir.getAbsolutePath());