diff --git a/build.gradle b/build.gradle
index 489473c885a..0564d27a013 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2160,6 +2160,7 @@ project(':streams') {
testImplementation project(':storage')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
+ testImplementation project(':server')
testImplementation libs.log4j
testImplementation libs.junitJupiter
testImplementation libs.junitVintageEngine
@@ -2742,6 +2743,7 @@ project(':jmh-benchmarks') {
exclude group: 'net.sf.jopt-simple', module: 'jopt-simple'
}
implementation project(':server-common')
+ implementation project(':server')
implementation project(':clients')
implementation project(':group-coordinator')
implementation project(':metadata')
@@ -2899,7 +2901,7 @@ project(':connect:json') {
api libs.jacksonAfterburner
implementation libs.slf4jApi
-
+
testImplementation libs.junitJupiter
testRuntimeOnly libs.slf4jlog4j
@@ -2969,6 +2971,7 @@ project(':connect:runtime') {
testImplementation project(':metadata')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
+ testImplementation project(':server')
testImplementation project(':storage')
testImplementation project(':connect:test-plugins')
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 8bbf5728212..616e0e519c6 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -605,6 +605,7 @@
+
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index c15aa27ae59..8a06c8e0b0a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -55,6 +55,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,7 +155,7 @@ public class EmbeddedKafkaCluster {
}
private void doStart() {
- brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
+ brokerConfig.put(ZkConfigs.ZK_CONNECT_PROP, zKConnectString());
putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 2e867d76dd2..7120d0d8c34 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -21,7 +21,6 @@ import java.util.Properties
import joptsimple._
import joptsimple.util.EnumConverter
import kafka.security.authorizer.{AclAuthorizer, AclEntry}
-import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.acl._
@@ -33,6 +32,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
@@ -200,7 +200,7 @@ object AclCommand extends Logging {
// We will default the value of zookeeper.set.acl to true or false based on whether SASL is configured,
// but if SASL is not configured and zookeeper.set.acl is supposed to be true due to mutual certificate authentication
// then it will be up to the user to explicitly specify zookeeper.set.acl=true in the authorizer-properties.
- val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSaslEnabled)
+ val defaultProps = Map(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP -> JaasUtils.isZkSaslEnabled)
val authorizerPropertiesWithoutTls =
if (opts.options.has(opts.authorizerPropertiesOpt)) {
val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt)
@@ -211,7 +211,7 @@ object AclCommand extends Logging {
val authorizerProperties =
if (opts.options.has(opts.zkTlsConfigFile)) {
// load in TLS configs both with and without the "authorizer." prefix
- val validKeys = (KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList ++ KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.map("authorizer." + _).toList).asJava
+ val validKeys = (ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList ++ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.map("authorizer." + _).toList).asJava
authorizerPropertiesWithoutTls ++ Utils.loadProps(opts.options.valueOf(opts.zkTlsConfigFile), validKeys).asInstanceOf[java.util.Map[String, Any]].asScala
}
else
@@ -619,7 +619,7 @@ object AclCommand extends Logging {
"DEPRECATED: Identifies the file where ZooKeeper client TLS connectivity properties are defined for" +
" the default authorizer kafka.security.authorizer.AclAuthorizer." +
" Any properties other than the following (with or without an \"authorizer.\" prefix) are ignored: " +
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.sorted.mkString(", ") +
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.sorted.mkString(", ") +
". Note that if SASL is not configured and zookeeper.set.acl is supposed to be true due to mutual certificate authentication being used" +
" then it is necessary to explicitly specify --authorizer-properties zookeeper.set.acl=true. " +
AclCommand.AuthorizerDeprecationMessage)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index d03d7dfb9b5..a10722f000f 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
-import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
+import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals, ZkConfigs}
import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
@@ -864,7 +864,7 @@ object ConfigCommand extends Logging {
.ofType(classOf[String])
val zkTlsConfigFile: OptionSpec[String] = parser.accepts("zk-tls-config-file",
"Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " +
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.sorted.mkString(", ") + " are ignored.")
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.sorted.mkString(", ") + " are ignored.")
.withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String])
options = parser.parse(args : _*)
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 9c81bb23e2e..dd07f522e77 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -24,6 +24,7 @@ import kafka.utils.Implicits._
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
import org.apache.zookeeper.KeeperException
@@ -81,7 +82,7 @@ object ZkSecurityMigrator extends Logging {
if (jaasFile == null && !tlsClientAuthEnabled) {
val errorMsg = s"No JAAS configuration file has been specified and no TLS client certificate has been specified. Please make sure that you set " +
s"the system property ${JaasUtils.JAVA_LOGIN_CONFIG_PARAM} or provide a ZooKeeper client TLS configuration via --$tlsConfigFileOption " +
- s"identifying at least ${KafkaConfig.ZkSslClientEnableProp}, ${KafkaConfig.ZkClientCnxnSocketProp}, and ${KafkaConfig.ZkSslKeyStoreLocationProp}"
+ s"identifying at least ${ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP}, ${ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP}, and ${ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP}"
System.err.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
}
@@ -124,7 +125,7 @@ object ZkSecurityMigrator extends Logging {
}
def createZkClientConfigFromFile(filename: String) : ZKClientConfig = {
- val zkTlsConfigFileProps = Utils.loadProps(filename, KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.asJava)
+ val zkTlsConfigFileProps = Utils.loadProps(filename, ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.toList.asJava)
val zkClientConfig = new ZKClientConfig() // Initializes based on any system properties that have been set
// Now override any set system properties with explicitly-provided values from the config file
// Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make
@@ -156,7 +157,7 @@ object ZkSecurityMigrator extends Logging {
"before migration. If not, exit the command.")
val zkTlsConfigFile: OptionSpec[String] = parser.accepts(tlsConfigFileOption,
"Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " +
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.mkString(", ") + " are ignored.")
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.mkString(", ") + " are ignored.")
.withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String])
options = parser.parse(args : _*)
}
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index 99642b33c2b..dc307ca702f 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV1
+import org.apache.kafka.server.config.ZkConfigs
import org.apache.zookeeper.client.ZKClientConfig
import scala.annotation.nowarn
@@ -96,7 +97,7 @@ object AclAuthorizer {
}
private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
- val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
+ val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP).
map(_.toString.trim).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
if (!zkSslClientEnable)
new ZKClientConfig
@@ -105,10 +106,10 @@ object AclAuthorizer {
// be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, forceZkSslClientEnable = true)
// add in any prefixed overlays
- KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, sysProp) =>
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.forKeyValue { (kafkaProp, sysProp) =>
configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
zkClientConfig.setProperty(sysProp,
- if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
+ if (kafkaProp == ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP)
(prefixedValue.toString.trim.toUpperCase == "HTTPS").toString
else
prefixedValue.toString.trim)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4290ce1da8d..cec5f5649f6 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{MetadataVersion, MetadataVersionValidator}
import org.apache.kafka.server.common.MetadataVersion._
-import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms}
+import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Csv
@@ -67,53 +67,15 @@ object KafkaConfig {
DynamicBrokerConfig.dynamicConfigUpdateModes))
}
- /** ********* Zookeeper Configuration ***********/
- val ZkConnectProp = "zookeeper.connect"
- val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
- val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
- val ZkEnableSecureAclsProp = "zookeeper.set.acl"
- val ZkMaxInFlightRequestsProp = "zookeeper.max.in.flight.requests"
- val ZkSslClientEnableProp = "zookeeper.ssl.client.enable"
- val ZkClientCnxnSocketProp = "zookeeper.clientCnxnSocket"
- val ZkSslKeyStoreLocationProp = "zookeeper.ssl.keystore.location"
- val ZkSslKeyStorePasswordProp = "zookeeper.ssl.keystore.password"
- val ZkSslKeyStoreTypeProp = "zookeeper.ssl.keystore.type"
- val ZkSslTrustStoreLocationProp = "zookeeper.ssl.truststore.location"
- val ZkSslTrustStorePasswordProp = "zookeeper.ssl.truststore.password"
- val ZkSslTrustStoreTypeProp = "zookeeper.ssl.truststore.type"
- val ZkSslProtocolProp = "zookeeper.ssl.protocol"
- val ZkSslEnabledProtocolsProp = "zookeeper.ssl.enabled.protocols"
- val ZkSslCipherSuitesProp = "zookeeper.ssl.cipher.suites"
- val ZkSslEndpointIdentificationAlgorithmProp = "zookeeper.ssl.endpoint.identification.algorithm"
- val ZkSslCrlEnableProp = "zookeeper.ssl.crl.enable"
- val ZkSslOcspEnableProp = "zookeeper.ssl.ocsp.enable"
-
- // a map from the Kafka config to the corresponding ZooKeeper Java system property
- private[kafka] val ZkSslConfigToSystemPropertyMap: Map[String, String] = Map(
- ZkSslClientEnableProp -> ZKClientConfig.SECURE_CLIENT,
- ZkClientCnxnSocketProp -> ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
- ZkSslKeyStoreLocationProp -> "zookeeper.ssl.keyStore.location",
- ZkSslKeyStorePasswordProp -> "zookeeper.ssl.keyStore.password",
- ZkSslKeyStoreTypeProp -> "zookeeper.ssl.keyStore.type",
- ZkSslTrustStoreLocationProp -> "zookeeper.ssl.trustStore.location",
- ZkSslTrustStorePasswordProp -> "zookeeper.ssl.trustStore.password",
- ZkSslTrustStoreTypeProp -> "zookeeper.ssl.trustStore.type",
- ZkSslProtocolProp -> "zookeeper.ssl.protocol",
- ZkSslEnabledProtocolsProp -> "zookeeper.ssl.enabledProtocols",
- ZkSslCipherSuitesProp -> "zookeeper.ssl.ciphersuites",
- ZkSslEndpointIdentificationAlgorithmProp -> "zookeeper.ssl.hostnameVerification",
- ZkSslCrlEnableProp -> "zookeeper.ssl.crl",
- ZkSslOcspEnableProp -> "zookeeper.ssl.ocsp")
-
private[kafka] def zooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
- Option(clientConfig.getProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName)))
+ Option(clientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName)))
}
private[kafka] def setZooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String, kafkaPropValue: Any): Unit = {
- clientConfig.setProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName),
+ clientConfig.setProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName),
kafkaPropName match {
- case ZkSslEndpointIdentificationAlgorithmProp => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString
- case ZkSslEnabledProtocolsProp | ZkSslCipherSuitesProp => kafkaPropValue match {
+ case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString
+ case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => kafkaPropValue match {
case list: java.util.List[_] => list.asScala.mkString(",")
case _ => kafkaPropValue.toString
}
@@ -124,9 +86,9 @@ object KafkaConfig {
// For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS
// with both a client connection socket and a key store location explicitly set.
private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = {
- zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).contains("true") &&
- zooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
- zooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
+ zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP).contains("true") &&
+ zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP).isDefined &&
+ zooKeeperClientProperty(zkClientConfig, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP).isDefined
}
/** ********* General Configuration ***********/
@@ -439,51 +401,6 @@ object KafkaConfig {
val UnstableMetadataVersionsEnableProp = "unstable.metadata.versions.enable"
/* Documentation */
- /** ********* Zookeeper Configuration ***********/
- val ZkConnectDoc = "Specifies the ZooKeeper connection string in the form hostname:port where host and port are the " +
- "host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is " +
- "down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.\n" +
- "The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " +
- "For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path."
- val ZkSessionTimeoutMsDoc = "Zookeeper session timeout"
- val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZkSessionTimeoutMsProp + " is used"
- val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
- val ZkMaxInFlightRequestsDoc = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking."
- val ZkSslClientEnableDoc = "Set client to use TLS when connecting to ZooKeeper." +
- " An explicit value overrides any value set via the zookeeper.client.secure system property (note the different name)." +
- s" Defaults to false if neither is set; when true, $ZkClientCnxnSocketProp must be set (typically to org.apache.zookeeper.ClientCnxnSocketNetty); other values to set may include " +
- ZkSslConfigToSystemPropertyMap.keys.toList.filter(x => x != ZkSslClientEnableProp && x != ZkClientCnxnSocketProp).sorted.mkString("", ", ", "")
- val ZkClientCnxnSocketDoc = "Typically set to org.apache.zookeeper.ClientCnxnSocketNetty when using TLS connectivity to ZooKeeper." +
- s" Overrides any explicit value set via the same-named ${ZkSslConfigToSystemPropertyMap(ZkClientCnxnSocketProp)} system property."
- val ZkSslKeyStoreLocationDoc = "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper." +
- s" Overrides any explicit value set via the ${ZkSslConfigToSystemPropertyMap(ZkSslKeyStoreLocationProp)} system property (note the camelCase)."
- val ZkSslKeyStorePasswordDoc = "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
- s" Overrides any explicit value set via the ${ZkSslConfigToSystemPropertyMap(ZkSslKeyStorePasswordProp)} system property (note the camelCase)." +
- " Note that ZooKeeper does not support a key password different from the keystore password, so be sure to set the key password in the keystore to be identical to the keystore password; otherwise the connection attempt to Zookeeper will fail."
- val ZkSslKeyStoreTypeDoc = "Keystore type when using a client-side certificate with TLS connectivity to ZooKeeper." +
- s" Overrides any explicit value set via the ${ZkSslConfigToSystemPropertyMap(ZkSslKeyStoreTypeProp)} system property (note the camelCase)." +
- " The default value of null means the type will be auto-detected based on the filename extension of the keystore."
- val ZkSslTrustStoreLocationDoc = "Truststore location when using TLS connectivity to ZooKeeper." +
- s" Overrides any explicit value set via the ${ZkSslConfigToSystemPropertyMap(ZkSslTrustStoreLocationProp)} system property (note the camelCase)."
- val ZkSslTrustStorePasswordDoc = "Truststore password when using TLS connectivity to ZooKeeper." +
- s" Overrides any explicit value set via the ${ZkSslConfigToSystemPropertyMap(ZkSslTrustStorePasswordProp)} system property (note the camelCase)."
- val ZkSslTrustStoreTypeDoc = "Truststore type when using TLS connectivity to ZooKeeper." +
- s" Overrides any explicit value set via the ${ZkSslConfigToSystemPropertyMap(ZkSslTrustStoreTypeProp)} system property (note the camelCase)." +
- " The default value of null means the type will be auto-detected based on the filename extension of the truststore."
- val ZkSslProtocolDoc = "Specifies the protocol to be used in ZooKeeper TLS negotiation." +
- s" An explicit value overrides any value set via the same-named ${ZkSslConfigToSystemPropertyMap(ZkSslProtocolProp)} system property."
- val ZkSslEnabledProtocolsDoc = "Specifies the enabled protocol(s) in ZooKeeper TLS negotiation (csv)." +
- s" Overrides any explicit value set via the ${ZkSslConfigToSystemPropertyMap(ZkSslEnabledProtocolsProp)} system property (note the camelCase)." +
- s" The default value of null means the enabled protocol will be the value of the ${KafkaConfig.ZkSslProtocolProp} configuration property."
- val ZkSslCipherSuitesDoc = "Specifies the enabled cipher suites to be used in ZooKeeper TLS negotiation (csv)." +
- s""" Overrides any explicit value set via the ${ZkSslConfigToSystemPropertyMap(ZkSslCipherSuitesProp)} system property (note the single word \"ciphersuites\").""" +
- " The default value of null means the list of enabled cipher suites is determined by the Java runtime being used."
- val ZkSslEndpointIdentificationAlgorithmDoc = "Specifies whether to enable hostname verification in the ZooKeeper TLS negotiation process, with (case-insensitively) \"https\" meaning ZooKeeper hostname verification is enabled and an explicit blank value meaning it is disabled (disabling it is only recommended for testing purposes)." +
- s""" An explicit value overrides any \"true\" or \"false\" value set via the ${ZkSslConfigToSystemPropertyMap(ZkSslEndpointIdentificationAlgorithmProp)} system property (note the different name and values; true implies https and false implies blank)."""
- val ZkSslCrlEnableDoc = "Specifies whether to enable Certificate Revocation List in the ZooKeeper TLS protocols." +
- s" Overrides any explicit value set via the ${ZkSslConfigToSystemPropertyMap(ZkSslCrlEnableProp)} system property (note the shorter name)."
- val ZkSslOcspEnableDoc = "Specifies whether to enable Online Certificate Status Protocol in the ZooKeeper TLS protocols." +
- s" Overrides any explicit value set via the ${ZkSslConfigToSystemPropertyMap(ZkSslOcspEnableProp)} system property (note the shorter name)."
/** ********* General Configuration ***********/
val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
@@ -943,25 +860,25 @@ object KafkaConfig {
new ConfigDef()
/** ********* Zookeeper Configuration ***********/
- .define(ZkConnectProp, STRING, null, HIGH, ZkConnectDoc)
- .define(ZkSessionTimeoutMsProp, INT, Defaults.ZK_SESSION_TIMEOUT_MS, HIGH, ZkSessionTimeoutMsDoc)
- .define(ZkConnectionTimeoutMsProp, INT, null, HIGH, ZkConnectionTimeoutMsDoc)
- .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZK_ENABLE_SECURE_ACLS, HIGH, ZkEnableSecureAclsDoc)
- .define(ZkMaxInFlightRequestsProp, INT, Defaults.ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZkMaxInFlightRequestsDoc)
- .define(ZkSslClientEnableProp, BOOLEAN, Defaults.ZK_SSL_CLIENT_ENABLE, MEDIUM, ZkSslClientEnableDoc)
- .define(ZkClientCnxnSocketProp, STRING, null, MEDIUM, ZkClientCnxnSocketDoc)
- .define(ZkSslKeyStoreLocationProp, STRING, null, MEDIUM, ZkSslKeyStoreLocationDoc)
- .define(ZkSslKeyStorePasswordProp, PASSWORD, null, MEDIUM, ZkSslKeyStorePasswordDoc)
- .define(ZkSslKeyStoreTypeProp, STRING, null, MEDIUM, ZkSslKeyStoreTypeDoc)
- .define(ZkSslTrustStoreLocationProp, STRING, null, MEDIUM, ZkSslTrustStoreLocationDoc)
- .define(ZkSslTrustStorePasswordProp, PASSWORD, null, MEDIUM, ZkSslTrustStorePasswordDoc)
- .define(ZkSslTrustStoreTypeProp, STRING, null, MEDIUM, ZkSslTrustStoreTypeDoc)
- .define(ZkSslProtocolProp, STRING, Defaults.ZK_SSL_PROTOCOL, LOW, ZkSslProtocolDoc)
- .define(ZkSslEnabledProtocolsProp, LIST, null, LOW, ZkSslEnabledProtocolsDoc)
- .define(ZkSslCipherSuitesProp, LIST, null, LOW, ZkSslCipherSuitesDoc)
- .define(ZkSslEndpointIdentificationAlgorithmProp, STRING, Defaults.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, ZkSslEndpointIdentificationAlgorithmDoc)
- .define(ZkSslCrlEnableProp, BOOLEAN, Defaults.ZK_SSL_CRL_ENABLE, LOW, ZkSslCrlEnableDoc)
- .define(ZkSslOcspEnableProp, BOOLEAN, Defaults.ZK_SSL_OCSP_ENABLE, LOW, ZkSslOcspEnableDoc)
+ .define(ZkConfigs.ZK_CONNECT_PROP, STRING, null, HIGH, ZkConfigs.ZK_CONNECT_DOC)
+ .define(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, INT, ZkConfigs.ZK_SESSION_TIMEOUT_MS, HIGH, ZkConfigs.ZK_SESSION_TIMEOUT_MS_DOC)
+ .define(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, INT, null, HIGH, ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_DOC)
+ .define(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, BOOLEAN, ZkConfigs.ZK_ENABLE_SECURE_ACLS, HIGH, ZkConfigs.ZK_ENABLE_SECURE_ACLS_DOC)
+ .define(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP, INT, ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_DOC)
+ .define(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, BOOLEAN, ZkConfigs.ZK_SSL_CLIENT_ENABLE, MEDIUM, ZkConfigs.ZK_SSL_CLIENT_ENABLE_DOC)
+ .define(ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_DOC)
+ .define(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_DOC)
+ .define(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, PASSWORD, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_DOC)
+ .define(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_KEY_STORE_TYPE_DOC)
+ .define(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_DOC)
+ .define(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, PASSWORD, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_DOC)
+ .define(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, STRING, null, MEDIUM, ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_DOC)
+ .define(ZkConfigs.ZK_SSL_PROTOCOL_PROP, STRING, ZkConfigs.ZK_SSL_PROTOCOL, LOW, ZkConfigs.ZK_SSL_PROTOCOL_DOC)
+ .define(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, LIST, null, LOW, ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_DOC)
+ .define(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, LIST, null, LOW, ZkConfigs.ZK_SSL_CIPHER_SUITES_DOC)
+ .define(ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP, STRING, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, LOW, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
+ .define(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, BOOLEAN, ZkConfigs.ZK_SSL_CRL_ENABLE, LOW, ZkConfigs.ZK_SSL_CRL_ENABLE_DOC)
+ .define(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, BOOLEAN, ZkConfigs.ZK_SSL_OCSP_ENABLE, LOW, ZkConfigs.ZK_SSL_OCSP_ENABLE_DOC)
/** ********* General Configuration ***********/
.define(BrokerIdGenerationEnableProp, BOOLEAN, Defaults.BROKER_ID_GENERATION_ENABLE, MEDIUM, BrokerIdGenerationEnableDoc)
@@ -1417,12 +1334,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
super.valuesWithPrefixOverride(prefix)
/** ********* Zookeeper Configuration ***********/
- val zkConnect: String = getString(KafkaConfig.ZkConnectProp)
- val zkSessionTimeoutMs: Int = getInt(KafkaConfig.ZkSessionTimeoutMsProp)
+ val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_PROP)
+ val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP)
val zkConnectionTimeoutMs: Int =
- Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
- val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
- val zkMaxInFlightRequests: Int = getInt(KafkaConfig.ZkMaxInFlightRequestsProp)
+ Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP))
+ val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP)
+ val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP)
private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig
@@ -1470,21 +1387,21 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
}
- val zkSslClientEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslClientEnableProp)
- val zkClientCnxnSocketClassName = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkClientCnxnSocketProp)
- val zkSslKeyStoreLocation = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslKeyStoreLocationProp)
- val zkSslKeyStorePassword = zkPasswordConfigOrSystemProperty(KafkaConfig.ZkSslKeyStorePasswordProp)
- val zkSslKeyStoreType = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslKeyStoreTypeProp)
- val zkSslTrustStoreLocation = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslTrustStoreLocationProp)
- val zkSslTrustStorePassword = zkPasswordConfigOrSystemProperty(KafkaConfig.ZkSslTrustStorePasswordProp)
- val zkSslTrustStoreType = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslTrustStoreTypeProp)
- val ZkSslProtocol = zkStringConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslProtocolProp)
- val ZkSslEnabledProtocols = zkListConfigOrSystemProperty(KafkaConfig.ZkSslEnabledProtocolsProp)
- val ZkSslCipherSuites = zkListConfigOrSystemProperty(KafkaConfig.ZkSslCipherSuitesProp)
+ val zkSslClientEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP)
+ val zkClientCnxnSocketClassName = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP)
+ val zkSslKeyStoreLocation = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP)
+ val zkSslKeyStorePassword = zkPasswordConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP)
+ val zkSslKeyStoreType = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP)
+ val zkSslTrustStoreLocation = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP)
+ val zkSslTrustStorePassword = zkPasswordConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP)
+ val zkSslTrustStoreType = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP)
+ val ZkSslProtocol = zkStringConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_PROTOCOL_PROP)
+ val ZkSslEnabledProtocols = zkListConfigOrSystemProperty(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP)
+ val ZkSslCipherSuites = zkListConfigOrSystemProperty(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP)
val ZkSslEndpointIdentificationAlgorithm = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false to HTTPS/
- val kafkaProp = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
+ val kafkaProp = ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP
val actuallyProvided = originals.containsKey(kafkaProp)
if (actuallyProvided)
getString(kafkaProp)
@@ -1496,8 +1413,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
}
}
- val ZkSslCrlEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslCrlEnableProp)
- val ZkSslOcspEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslOcspEnableProp)
+ val ZkSslCrlEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP)
+ val ZkSslOcspEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP)
/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
@@ -2040,7 +1957,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
if (requiresZookeeper) {
if (zkConnect == null) {
- throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.")
+ throw new ConfigException(s"Missing required configuration `${ZkConfigs.ZK_CONNECT_PROP}` which has no default value.")
}
if (brokerIdGenerationEnable) {
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id")
@@ -2055,7 +1972,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
if (migrationEnabled) {
if (zkConnect == null) {
- throw new ConfigException(s"If using `${KafkaConfig.MigrationEnabledProp}` in KRaft mode, `${KafkaConfig.ZkConnectProp}` must also be set.")
+ throw new ConfigException(s"If using `${KafkaConfig.MigrationEnabledProp}` in KRaft mode, `${ZkConfigs.ZK_CONNECT_PROP}` must also be set.")
}
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 55c773b0a46..fce61e17a22 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -57,7 +57,7 @@ import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.server.fault.LoggingFaultHandler
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -80,20 +80,20 @@ object KafkaServer {
def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false): ZKClientConfig = {
val clientConfig = new ZKClientConfig
if (config.zkSslClientEnable || forceZkSslClientEnable) {
- KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslClientEnableProp, "true")
- config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkClientCnxnSocketProp, _))
- config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreLocationProp, _))
- config.zkSslKeyStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStorePasswordProp, x.value))
- config.zkSslKeyStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreTypeProp, _))
- config.zkSslTrustStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStoreLocationProp, _))
- config.zkSslTrustStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStorePasswordProp, x.value))
- config.zkSslTrustStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStoreTypeProp, _))
- KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslProtocolProp, config.ZkSslProtocol)
- config.ZkSslEnabledProtocols.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEnabledProtocolsProp, _))
- config.ZkSslCipherSuites.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCipherSuitesProp, _))
- KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp, config.ZkSslEndpointIdentificationAlgorithm)
- KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCrlEnableProp, config.ZkSslCrlEnable.toString)
- KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslOcspEnableProp, config.ZkSslOcspEnable.toString)
+ KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, "true")
+ config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP, _))
+ config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, _))
+ config.zkSslKeyStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, x.value))
+ config.zkSslKeyStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, _))
+ config.zkSslTrustStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, _))
+ config.zkSslTrustStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, x.value))
+ config.zkSslTrustStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, _))
+ KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_PROTOCOL_PROP, config.ZkSslProtocol)
+ config.ZkSslEnabledProtocols.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, _))
+ config.ZkSslCipherSuites.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, _))
+ KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP, config.ZkSslEndpointIdentificationAlgorithm)
+ KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, config.ZkSslCrlEnable.toString)
+ KafkaConfig.setZooKeeperClientProperty(clientConfig, ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, config.ZkSslOcspEnable.toString)
}
// The zk sasl is enabled by default so it can produce false error when broker does not intend to use SASL.
if (!JaasUtils.isZkSaslEnabled) clientConfig.setProperty(JaasUtils.ZK_SASL_CLIENT, "false")
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 26c9453bef2..749de428533 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
@@ -2349,9 +2349,9 @@ object KafkaZkClient {
if (secureAclsEnabled && !isZkSecurityEnabled)
throw new java.lang.SecurityException(
- s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but ZooKeeper client TLS configuration identifying at least " +
- s"${KafkaConfig.ZkSslClientEnableProp}, ${KafkaConfig.ZkClientCnxnSocketProp}, and " +
- s"${KafkaConfig.ZkSslKeyStoreLocationProp} was not present and the verification of the JAAS login file failed " +
+ s"${ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP} is true, but ZooKeeper client TLS configuration identifying at least " +
+ s"${ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP}, ${ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP}, and " +
+ s"${ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP} was not present and the verification of the JAAS login file failed " +
s"${JaasUtils.zkSecuritySysConfigString}")
KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 6ea0c2eaec0..77d0784b1a5 100644
--- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
@@ -75,7 +76,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
import DescribeAuthorizedOperationsTest._
override val brokerCount = 1
- this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
var client: Admin = _
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 46e674c00aa..061ba3de9ea 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -18,9 +18,9 @@
package kafka.api
import com.yammer.metrics.core.Gauge
+
import java.util.{Collections, Properties}
import java.util.concurrent.ExecutionException
-
import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
@@ -38,11 +38,12 @@ import org.apache.kafka.common.resource._
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.security.auth._
+import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{ValueSource, CsvSource}
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import scala.jdk.CollectionConverters._
@@ -156,7 +157,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
} else {
// The next two configuration parameters enable ZooKeeper secure ACLs
// and sets the Kafka authorizer, both necessary to enable security.
- this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName)
// Set the specific principal that can update ACLs.
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index cb4b5b96979..658b57cddd3 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPart
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.TestJaasConfig
+import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -43,7 +44,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
private val kafkaServerJaasEntryName =
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
- this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
+ this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "false")
this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
this.serverConfig.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "2.8")
this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index e084454f5ff..fe32106877a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
-import org.apache.kafka.server.config.Defaults
+import org.apache.kafka.server.config.{Defaults, ZkConfigs}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
@@ -2707,7 +2707,7 @@ object PlaintextAdminIntegrationTest {
var topicConfigEntries2 = Seq(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")).asJava
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, test.brokers.head.config.brokerId.toString)
- val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.ZkConnectProp, "localhost:2181")).asJava
+ val brokerConfigEntries = Seq(new ConfigEntry(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")).asJava
// Alter configs: first and third are invalid, second is valid
var alterResult = admin.alterConfigs(Map(
diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index 6fc3fb9b8a1..4cf7a320ff2 100644
--- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -12,10 +12,10 @@
*/
package kafka.api
-import kafka.server.KafkaConfig
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import kafka.utils.{JaasTestUtils, TestInfoUtils, TestUtils}
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
private val kafkaClientSaslMechanism = "PLAIN"
private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN")
- this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 6a03d51aa36..f039e16faa3 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -13,10 +13,10 @@
package kafka.api
import java.util.Locale
-import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
@Timeout(600)
@@ -26,7 +26,7 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {
private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
private val kafkaServerJaasEntryName =
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
- this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
+ this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "false")
// disable secure acls of zkClient in QuorumTestHarness
override protected def zkAclsEnabled = Some(false)
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 29b51713257..1d2f4eb8bbf 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.storage.internals.log.LogConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
@@ -46,7 +47,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], classOf[AclAuthorizer])
- this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
index 0ea458e25aa..19b6e9e4614 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
@@ -12,14 +12,14 @@
*/
package kafka.api
-import kafka.server.KafkaConfig
import kafka.utils.{JaasTestUtils, TestUtils}
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
@Timeout(600)
class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
- this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index 4e6ae3f8132..ff6ea03a677 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -14,10 +14,8 @@ package kafka.api
import java.util
import java.util.concurrent._
-
import com.yammer.metrics.core.Gauge
import kafka.security.authorizer.AclAuthorizer
-import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateAclsResult}
import org.apache.kafka.common.acl._
@@ -25,6 +23,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.authorizer._
+import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, Test}
@@ -80,7 +79,7 @@ object SslAdminIntegrationTest {
class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
override val authorizationAdmin = new AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], classOf[AclAuthorizer])
- this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
override protected def securityProtocol = SecurityProtocol.SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c984eae0272..dd85bd41cb0 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -60,7 +60,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.security.PasswordEncoder
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.ShutdownableThread
@@ -122,7 +122,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
properties
} else {
val properties = TestUtils.createBrokerConfig(brokerId, zkConnect)
- properties.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ properties.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
properties
}
props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
index d4a1e8f1dd1..7bf86259230 100644
--- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -25,6 +25,7 @@ import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.Uuid
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions.{assertThrows, fail}
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{Tag, Timeout}
@@ -61,7 +62,7 @@ class KafkaServerKRaftRegistrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
- .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@@ -99,7 +100,7 @@ class KafkaServerKRaftRegistrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
- .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 4d7ddc15cb3..16e1a290278 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.coordinator.group.OffsetConfig
+import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
@@ -79,7 +80,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$Internal:PLAINTEXT, $SecureInternal:SASL_SSL," +
s"$External:PLAINTEXT, $SecureExternal:SASL_SSL")
props.put(KafkaConfig.InterBrokerListenerNameProp, Internal)
- props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ props.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP, "true")
props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, kafkaClientSaslMechanism)
props.put(s"${new ListenerName(SecureInternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}",
kafkaServerSaslMechanisms(SecureInternal).mkString(","))
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index a3e5ecc0814..ff8d2b797f3 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -48,7 +48,7 @@ import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail}
import org.junit.jupiter.api.{Assumptions, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
@@ -177,7 +177,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
- .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@@ -309,7 +309,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
- .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@@ -443,7 +443,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
- .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@@ -508,7 +508,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
- .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@@ -576,7 +576,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
- .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@@ -636,7 +636,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
- .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
@@ -711,7 +711,7 @@ class ZkMigrationIntegrationTest {
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
- .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+ .setConfigProp(ZkConfigs.ZK_CONNECT_PROP, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
.build()
try {
kraftCluster.format()
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 4cec0d05e5b..3d97e7df171 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -25,6 +25,7 @@ import kafka.utils.TestUtils.assertBadConfigContainingMessage
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.FatalExitError
+import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
@@ -155,7 +156,7 @@ class KafkaTest {
"Missing required configuration `zookeeper.connect` which has no default value.")
// Ensure that no exception is thrown once zookeeper.connect is defined (and we clear controller.listener.names)
- propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ propertiesFile.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
propertiesFile.setProperty(KafkaConfig.ControllerListenerNamesProp, "")
KafkaConfig.fromProps(propertiesFile)
}
@@ -232,61 +233,61 @@ class KafkaTest {
@Test
def testZkSslClientEnable(): Unit = {
- testZkConfig(KafkaConfig.ZkSslClientEnableProp, "zookeeper.ssl.client.enable",
+ testZkConfig(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, "zookeeper.ssl.client.enable",
"zookeeper.client.secure", booleanPropValueToSet, config => Some(config.zkSslClientEnable), booleanPropValueToSet, Some(false))
}
@Test
def testZkSslKeyStoreLocation(): Unit = {
- testZkConfig(KafkaConfig.ZkSslKeyStoreLocationProp, "zookeeper.ssl.keystore.location",
+ testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP, "zookeeper.ssl.keystore.location",
"zookeeper.ssl.keyStore.location", stringPropValueToSet, config => config.zkSslKeyStoreLocation, stringPropValueToSet)
}
@Test
def testZkSslTrustStoreLocation(): Unit = {
- testZkConfig(KafkaConfig.ZkSslTrustStoreLocationProp, "zookeeper.ssl.truststore.location",
+ testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP, "zookeeper.ssl.truststore.location",
"zookeeper.ssl.trustStore.location", stringPropValueToSet, config => config.zkSslTrustStoreLocation, stringPropValueToSet)
}
@Test
def testZookeeperKeyStorePassword(): Unit = {
- testZkConfig(KafkaConfig.ZkSslKeyStorePasswordProp, "zookeeper.ssl.keystore.password",
+ testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP, "zookeeper.ssl.keystore.password",
"zookeeper.ssl.keyStore.password", passwordPropValueToSet, config => config.zkSslKeyStorePassword, new Password(passwordPropValueToSet))
}
@Test
def testZookeeperTrustStorePassword(): Unit = {
- testZkConfig(KafkaConfig.ZkSslTrustStorePasswordProp, "zookeeper.ssl.truststore.password",
+ testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP, "zookeeper.ssl.truststore.password",
"zookeeper.ssl.trustStore.password", passwordPropValueToSet, config => config.zkSslTrustStorePassword, new Password(passwordPropValueToSet))
}
@Test
def testZkSslKeyStoreType(): Unit = {
- testZkConfig(KafkaConfig.ZkSslKeyStoreTypeProp, "zookeeper.ssl.keystore.type",
+ testZkConfig(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP, "zookeeper.ssl.keystore.type",
"zookeeper.ssl.keyStore.type", stringPropValueToSet, config => config.zkSslKeyStoreType, stringPropValueToSet)
}
@Test
def testZkSslTrustStoreType(): Unit = {
- testZkConfig(KafkaConfig.ZkSslTrustStoreTypeProp, "zookeeper.ssl.truststore.type",
+ testZkConfig(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP, "zookeeper.ssl.truststore.type",
"zookeeper.ssl.trustStore.type", stringPropValueToSet, config => config.zkSslTrustStoreType, stringPropValueToSet)
}
@Test
def testZkSslProtocol(): Unit = {
- testZkConfig(KafkaConfig.ZkSslProtocolProp, "zookeeper.ssl.protocol",
+ testZkConfig(ZkConfigs.ZK_SSL_PROTOCOL_PROP, "zookeeper.ssl.protocol",
"zookeeper.ssl.protocol", stringPropValueToSet, config => Some(config.ZkSslProtocol), stringPropValueToSet, Some("TLSv1.2"))
}
@Test
def testZkSslEnabledProtocols(): Unit = {
- testZkConfig(KafkaConfig.ZkSslEnabledProtocolsProp, "zookeeper.ssl.enabled.protocols",
+ testZkConfig(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP, "zookeeper.ssl.enabled.protocols",
"zookeeper.ssl.enabledProtocols", listPropValueToSet.mkString(","), config => config.ZkSslEnabledProtocols, listPropValueToSet.asJava)
}
@Test
def testZkSslCipherSuites(): Unit = {
- testZkConfig(KafkaConfig.ZkSslCipherSuitesProp, "zookeeper.ssl.cipher.suites",
+ testZkConfig(ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP, "zookeeper.ssl.cipher.suites",
"zookeeper.ssl.ciphersuites", listPropValueToSet.mkString(","), config => config.ZkSslCipherSuites, listPropValueToSet.asJava)
}
@@ -294,7 +295,7 @@ class KafkaTest {
def testZkSslEndpointIdentificationAlgorithm(): Unit = {
// this property is different than the others
// because the system property values and the Kafka property values don't match
- val kafkaPropName = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
+ val kafkaPropName = ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP
assertEquals("zookeeper.ssl.endpoint.identification.algorithm", kafkaPropName)
val sysProp = "zookeeper.ssl.hostnameVerification"
val expectedDefaultValue = "HTTPS"
@@ -327,13 +328,13 @@ class KafkaTest {
@Test
def testZkSslCrlEnable(): Unit = {
- testZkConfig(KafkaConfig.ZkSslCrlEnableProp, "zookeeper.ssl.crl.enable",
+ testZkConfig(ZkConfigs.ZK_SSL_CRL_ENABLE_PROP, "zookeeper.ssl.crl.enable",
"zookeeper.ssl.crl", booleanPropValueToSet, config => Some(config.ZkSslCrlEnable), booleanPropValueToSet, Some(false))
}
@Test
def testZkSslOcspEnable(): Unit = {
- testZkConfig(KafkaConfig.ZkSslOcspEnableProp, "zookeeper.ssl.ocsp.enable",
+ testZkConfig(ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP, "zookeeper.ssl.ocsp.enable",
"zookeeper.ssl.ocsp", booleanPropValueToSet, config => Some(config.ZkSslOcspEnable), booleanPropValueToSet, Some(false))
}
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
index d6718e4203c..2730431565e 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_0_IV1, IBP_0_10_2_IV0, IBP_0_9_0, IBP_1_0_IV0, IBP_2_2_IV0, IBP_2_4_IV0, IBP_2_4_IV1, IBP_2_6_IV0, IBP_2_8_IV1, IBP_3_2_IV0, IBP_3_4_IV0}
+import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -895,7 +896,7 @@ class ControllerChannelManagerTest {
private def createConfig(interBrokerVersion: MetadataVersion): KafkaConfig = {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, controllerId.toString)
- props.put(KafkaConfig.ZkConnectProp, "zkConnect")
+ props.put(ZkConfigs.ZK_CONNECT_PROP, "zkConnect")
TestUtils.setIbpAndMessageFormatVersions(props, interBrokerVersion)
KafkaConfig.fromProps(props)
}
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
index 80790ab905f..29907f22068 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala
@@ -41,6 +41,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_2_0_IV0, IBP_2_0_IV1}
+import org.apache.kafka.server.config.ZkConfigs
import org.apache.zookeeper.client.ZKClientConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
@@ -869,7 +870,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(noTlsProps),
noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala)
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName =>
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach { propName =>
assertNull(zkClientConfig.getProperty(propName))
}
}
@@ -879,27 +880,27 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val props = new java.util.Properties()
val kafkaValue = "kafkaValue"
val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it
- KafkaConfig.ZkSslClientEnableProp -> "true",
- KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
- KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
- KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
- KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
- KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
- KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
- KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
- KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
- KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue)
+ ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true",
+ ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue)
configs.foreach { case (key, value) => props.put(key, value) }
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
// confirm we get all the values we expect
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
- case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop => prop match {
+ case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP =>
assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse(""))
- case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
+ case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP =>
assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse(""))
- case KafkaConfig.ZkSslProtocolProp =>
+ case ZkConfigs.ZK_SSL_PROTOCOL_PROP =>
assertEquals("TLSv1.2", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse(""))
case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse(""))
})
@@ -910,29 +911,29 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val props = new java.util.Properties()
val kafkaValue = "kafkaValue"
val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it
- KafkaConfig.ZkSslClientEnableProp -> "true",
- KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
- KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
- KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
- KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
- KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
- KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
- KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
- KafkaConfig.ZkSslProtocolProp -> kafkaValue,
- KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
- KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue,
- KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "HTTPS",
- KafkaConfig.ZkSslCrlEnableProp -> "false",
- KafkaConfig.ZkSslOcspEnableProp -> "false")
+ ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true",
+ ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_PROTOCOL_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "HTTPS",
+ ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "false",
+ ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "false")
configs.foreach{case (key, value) => props.put(key, value.toString) }
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
// confirm we get all the values we expect
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
- case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop => prop match {
+ case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP =>
assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse(""))
- case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
+ case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP =>
assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse(""))
case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse(""))
})
@@ -945,43 +946,43 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val prefixedValue = "prefixedValue"
val prefix = "authorizer."
val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it
- KafkaConfig.ZkSslClientEnableProp -> "false",
- KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
- KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
- KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
- KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
- KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
- KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
- KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
- KafkaConfig.ZkSslProtocolProp -> kafkaValue,
- KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
- KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue,
- KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "HTTPS",
- KafkaConfig.ZkSslCrlEnableProp -> "false",
- KafkaConfig.ZkSslOcspEnableProp -> "false",
- prefix + KafkaConfig.ZkSslClientEnableProp -> "true",
- prefix + KafkaConfig.ZkClientCnxnSocketProp -> prefixedValue,
- prefix + KafkaConfig.ZkSslKeyStoreLocationProp -> prefixedValue,
- prefix + KafkaConfig.ZkSslKeyStorePasswordProp -> prefixedValue,
- prefix + KafkaConfig.ZkSslKeyStoreTypeProp -> prefixedValue,
- prefix + KafkaConfig.ZkSslTrustStoreLocationProp -> prefixedValue,
- prefix + KafkaConfig.ZkSslTrustStorePasswordProp -> prefixedValue,
- prefix + KafkaConfig.ZkSslTrustStoreTypeProp -> prefixedValue,
- prefix + KafkaConfig.ZkSslProtocolProp -> prefixedValue,
- prefix + KafkaConfig.ZkSslEnabledProtocolsProp -> prefixedValue,
- prefix + KafkaConfig.ZkSslCipherSuitesProp -> prefixedValue,
- prefix + KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "",
- prefix + KafkaConfig.ZkSslCrlEnableProp -> "true",
- prefix + KafkaConfig.ZkSslOcspEnableProp -> "true")
+ ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "false",
+ ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_PROTOCOL_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> kafkaValue,
+ ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "HTTPS",
+ ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "false",
+ ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "false",
+ prefix + ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP -> "true",
+ prefix + ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP -> prefixedValue,
+ prefix + ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP -> prefixedValue,
+ prefix + ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP -> prefixedValue,
+ prefix + ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP -> prefixedValue,
+ prefix + ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP -> prefixedValue,
+ prefix + ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP -> prefixedValue,
+ prefix + ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP -> prefixedValue,
+ prefix + ZkConfigs.ZK_SSL_PROTOCOL_PROP -> prefixedValue,
+ prefix + ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP -> prefixedValue,
+ prefix + ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP -> prefixedValue,
+ prefix + ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP -> "",
+ prefix + ZkConfigs.ZK_SSL_CRL_ENABLE_PROP -> "true",
+ prefix + ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP -> "true")
configs.foreach{case (key, value) => props.put(key, value.toString) }
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
// confirm we get all the values we expect
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
- case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop => prop match {
+ case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP =>
assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse(""))
- case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
+ case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP =>
assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse(""))
case _ => assertEquals(prefixedValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse(""))
})
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 4e1c8eed276..8e7fe06ef2e 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.config.{ConfigException, SslConfigs}
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.server.authorizer._
-import org.apache.kafka.server.config.Defaults
+import org.apache.kafka.server.config.{Defaults, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler
@@ -211,7 +211,7 @@ class DynamicBrokerConfigTest {
val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix)
- val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
+ val nonDynamicProps = Map(ZkConfigs.ZK_CONNECT_PROP -> "somehost:2181")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps)
// Test update of configs with invalid type
@@ -709,7 +709,7 @@ class DynamicBrokerConfigTest {
@Test
def testNonInternalValuesDoesNotExposeInternalConfigs(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.put(KafkaConfig.MetadataLogSegmentMinBytesProp, "1024")
val config = new KafkaConfig(props)
assertFalse(config.nonInternalValues.containsKey(KafkaConfig.MetadataLogSegmentMinBytesProp))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index a5d4d961fe1..fd4377f658e 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.Node
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
-import org.apache.kafka.server.config.ServerTopicConfigSynonyms
+import org.apache.kafka.server.config.{ServerTopicConfigSynonyms, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.function.Executable
@@ -155,7 +155,7 @@ class KafkaConfigTest {
val hostName = "fake-host"
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")
val serverConfig = KafkaConfig.fromProps(props)
@@ -186,7 +186,7 @@ class KafkaConfigTest {
def testDuplicateListeners(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
// listeners with duplicate port
props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
@@ -212,7 +212,7 @@ class KafkaConfigTest {
def testIPv4AndIPv6SamePortListeners(): Unit = {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://[::1]:9092,SSL://[::1]:9092")
var caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
@@ -452,7 +452,7 @@ class KafkaConfigTest {
def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092")
assertBadConfigContainingMessage(props,
"Error creating broker listeners from 'CONTROLLER://localhost:9092': No security protocol defined for listener CONTROLLER")
@@ -465,7 +465,7 @@ class KafkaConfigTest {
def testBadListenerProtocol(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "BAD://localhost:9091")
assertFalse(isValidKafkaConfig(props))
@@ -475,7 +475,7 @@ class KafkaConfigTest {
def testListenerNamesWithAdvertisedListenerUnset(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
@@ -499,7 +499,7 @@ class KafkaConfigTest {
def testListenerAndAdvertisedListenerNames(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
props.setProperty(KafkaConfig.AdvertisedListenersProp, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
@@ -530,7 +530,7 @@ class KafkaConfigTest {
def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091,REPLICATION://localhost:9092")
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
@@ -541,7 +541,7 @@ class KafkaConfigTest {
def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
@@ -552,7 +552,7 @@ class KafkaConfigTest {
def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
@@ -564,7 +564,7 @@ class KafkaConfigTest {
def testCaseInsensitiveListenerProtocol(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
props.setProperty(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092")
val config = KafkaConfig.fromProps(props)
assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString))
@@ -579,7 +579,7 @@ class KafkaConfigTest {
def testListenerDefaults(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
// configuration with no listeners
val conf = KafkaConfig.fromProps(props)
@@ -593,7 +593,7 @@ class KafkaConfigTest {
def testVersionConfiguration(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.BrokerIdProp, "1")
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
val conf = KafkaConfig.fromProps(props)
assertEquals(MetadataVersion.latestProduction, conf.interBrokerProtocolVersion)
@@ -766,7 +766,7 @@ class KafkaConfigTest {
def testFromPropsInvalid(): Unit = {
def baseProperties: Properties = {
val validRequiredProperties = new Properties()
- validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+ validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181")
validRequiredProperties
}
// to ensure a basis is valid - bootstraps all needed validation
@@ -774,25 +774,25 @@ class KafkaConfigTest {
KafkaConfig.configNames.foreach { name =>
name match {
- case KafkaConfig.ZkConnectProp => // ignore string
- case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
- case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
- case KafkaConfig.ZkSslClientEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
- case KafkaConfig.ZkClientCnxnSocketProp => //ignore string
- case KafkaConfig.ZkSslKeyStoreLocationProp => //ignore string
- case KafkaConfig.ZkSslKeyStorePasswordProp => //ignore string
- case KafkaConfig.ZkSslKeyStoreTypeProp => //ignore string
- case KafkaConfig.ZkSslTrustStoreLocationProp => //ignore string
- case KafkaConfig.ZkSslTrustStorePasswordProp => //ignore string
- case KafkaConfig.ZkSslTrustStoreTypeProp => //ignore string
- case KafkaConfig.ZkSslProtocolProp => //ignore string
- case KafkaConfig.ZkSslEnabledProtocolsProp => //ignore string
- case KafkaConfig.ZkSslCipherSuitesProp => //ignore string
- case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => //ignore string
- case KafkaConfig.ZkSslCrlEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
- case KafkaConfig.ZkSslOcspEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+ case ZkConfigs.ZK_CONNECT_PROP => // ignore string
+ case ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case ZkConfigs.ZK_ENABLE_SECURE_ACLS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+ case ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
+ case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+ case ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP => //ignore string
+ case ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_PROP => //ignore string
+ case ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_PROP => //ignore string
+ case ZkConfigs.ZK_SSL_KEY_STORE_TYPE_PROP => //ignore string
+ case ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_PROP => //ignore string
+ case ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_PROP => //ignore string
+ case ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_PROP => //ignore string
+ case ZkConfigs.ZK_SSL_PROTOCOL_PROP => //ignore string
+ case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP => //ignore string
+ case ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => //ignore string
+ case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => //ignore string
+ case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+ case ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case KafkaConfig.BrokerIdProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
@@ -1051,7 +1051,7 @@ class KafkaConfigTest {
def testDynamicLogConfigs(): Unit = {
def baseProperties: Properties = {
val validRequiredProperties = new Properties()
- validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+ validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181")
validRequiredProperties
}
@@ -1141,9 +1141,9 @@ class KafkaConfigTest {
@Test
def testSpecificProperties(): Unit = {
val defaults = new Properties()
- defaults.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+ defaults.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181")
// For ZkConnectionTimeoutMs
- defaults.setProperty(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
+ defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, "1234")
defaults.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
defaults.setProperty(KafkaConfig.MaxReservedBrokerIdProp, "1")
defaults.setProperty(KafkaConfig.BrokerIdProp, "1")
@@ -1187,7 +1187,7 @@ class KafkaConfigTest {
@Test
def testNonroutableAdvertisedListeners(): Unit = {
val props = new Properties()
- props.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:2181")
props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
assertFalse(isValidKafkaConfig(props))
}
@@ -1601,7 +1601,7 @@ class KafkaConfigTest {
@Test
def testSaslJwksEndpointRetryDefaults(): Unit = {
val props = new Properties()
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
val config = KafkaConfig.fromProps(props)
assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMsProp))
assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp))
@@ -1769,7 +1769,7 @@ class KafkaConfigTest {
"If using `zookeeper.metadata.migration.enable` in KRaft mode, `zookeeper.connect` must also be set.",
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage)
- props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(ZkConfigs.ZK_CONNECT_PROP, "localhost:2181")
KafkaConfig.fromProps(props)
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
index bf6312f8631..035f60dab1d 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -25,8 +25,11 @@ import org.junit.jupiter.api.Test
import java.util.Properties
import java.net.{InetAddress, ServerSocket}
import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
+import scala.jdk.CollectionConverters._
+
class KafkaServerTest extends QuorumTestHarness {
@Test
@@ -64,7 +67,7 @@ class KafkaServerTest extends QuorumTestHarness {
@Test
def testCreatesProperZkConfigWhenSaslDisabled(): Unit = {
val props = new Properties
- props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
+ props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
assertEquals("false", zkClientConfig.getProperty(JaasUtils.ZK_SASL_CLIENT))
}
@@ -72,10 +75,10 @@ class KafkaServerTest extends QuorumTestHarness {
@Test
def testCreatesProperZkTlsConfigWhenDisabled(): Unit = {
val props = new Properties
- props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
- props.put(KafkaConfig.ZkSslClientEnableProp, "false")
+ props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out
+ props.put(ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP, "false")
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName =>
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach { propName =>
assertNull(zkClientConfig.getProperty(propName))
}
}
@@ -83,51 +86,51 @@ class KafkaServerTest extends QuorumTestHarness {
@Test
def testCreatesProperZkTlsConfigWithTrueValues(): Unit = {
val props = new Properties
- props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
+ props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out
// should get correct config for all properties if TLS is enabled
val someValue = "some_value"
def kafkaConfigValueToSet(kafkaProp: String) : String = kafkaProp match {
- case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "true"
- case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "HTTPS"
+ case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "true"
+ case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "HTTPS"
case _ => someValue
}
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
// now check to make sure the values were set correctly
def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
- case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "true"
- case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "true"
+ case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP | ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "true"
+ case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "true"
case _ => someValue
}
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
- assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp =>
+ assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaProp))))
}
@Test
def testCreatesProperZkTlsConfigWithFalseAndListValues(): Unit = {
val props = new Properties
- props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
+ props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect) // required, otherwise we would leave it out
// should get correct config for all properties if TLS is enabled
val someValue = "some_value"
def kafkaConfigValueToSet(kafkaProp: String) : String = kafkaProp match {
- case KafkaConfig.ZkSslClientEnableProp => "true"
- case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "false"
- case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => ""
- case KafkaConfig.ZkSslEnabledProtocolsProp | KafkaConfig.ZkSslCipherSuitesProp => "A,B"
+ case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => "true"
+ case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "false"
+ case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => ""
+ case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => "A,B"
case _ => someValue
}
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
// now check to make sure the values were set correctly
def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
- case KafkaConfig.ZkSslClientEnableProp => "true"
- case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "false"
- case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "false"
- case KafkaConfig.ZkSslEnabledProtocolsProp | KafkaConfig.ZkSslCipherSuitesProp => "A,B"
+ case ZkConfigs.ZK_SSL_CLIENT_ENABLE_PROP => "true"
+ case ZkConfigs.ZK_SSL_CRL_ENABLE_PROP | ZkConfigs.ZK_SSL_OCSP_ENABLE_PROP => "false"
+ case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP => "false"
+ case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_PROP | ZkConfigs.ZK_SSL_CIPHER_SUITES_PROP => "A,B"
case _ => someValue
}
- KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
- assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
+ ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(kafkaProp =>
+ assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaProp))))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 844701cc371..0e04bad7c8e 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.BrokerState
+import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
@@ -150,8 +151,8 @@ class ServerShutdownTest extends KafkaServerTestHarness {
shutdownKRaftController()
verifyCleanShutdownAfterFailedStartup[CancellationException]
} else {
- propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50")
- propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535")
+ propsToChangeUponRestart.setProperty(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, "50")
+ propsToChangeUponRestart.setProperty(ZkConfigs.ZK_CONNECT_PROP, "some.invalid.hostname.foo.bar.local:65535")
verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException]
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala b/core/src/test/scala/unit/kafka/server/ServerTest.scala
index d72ad2d7bcd..fb80c4b890d 100644
--- a/core/src/test/scala/unit/kafka/server/ServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala
@@ -20,6 +20,7 @@ import java.util.Properties
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metrics.MetricsContext
+import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -54,7 +55,7 @@ class ServerTest {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, brokerId.toString)
- props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:0")
+ props.put(ZkConfigs.ZK_CONNECT_PROP, "127.0.0.1:0")
val config = KafkaConfig.fromProps(props)
val context = Server.createKafkaMetricsContext(config, clusterId)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 44243d39ce8..33a4b27fd8b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -74,7 +74,7 @@ import org.apache.kafka.metadata.properties.MetaProperties
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.config.Defaults
+import org.apache.kafka.server.config.{Defaults, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
@@ -359,8 +359,8 @@ object TestUtils extends Logging {
// controllerQuorumVotersFuture instead.
props.put(KafkaConfig.QuorumVotersProp, "1000@localhost:0")
} else {
- props.put(KafkaConfig.ZkConnectProp, zkConnect)
- props.put(KafkaConfig.ZkConnectionTimeoutMsProp, "10000")
+ props.put(ZkConfigs.ZK_CONNECT_PROP, zkConnect)
+ props.put(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_PROP, "10000")
}
props.put(KafkaConfig.ReplicaSocketTimeoutMsProp, "1500")
props.put(KafkaConfig.ControllerSocketTimeoutMsProp, "1500")
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index a1a45784ad7..aa97f6831c3 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, ZkConfigs}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.{Code, NoAuthException, NoNodeException, NodeExistsException}
import org.apache.zookeeper.{CreateMode, ZooDefs}
@@ -106,7 +106,7 @@ class KafkaZkClientTest extends QuorumTestHarness {
// TLS connectivity itself is tested in system tests rather than here to avoid having to add TLS support
// to kafka.zk.EmbeddedZookeeper
val clientConfig = new ZKClientConfig()
- val propKey = KafkaConfig.ZkClientCnxnSocketProp
+ val propKey = ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP
val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
index 569cb5764b4..b4dccaecb0a 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationTestHarness.scala
@@ -21,6 +21,7 @@ import kafka.zk.ZkMigrationClient
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.security.PasswordEncoder
+import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import java.util.Properties
@@ -40,7 +41,7 @@ class ZkMigrationTestHarness extends QuorumTestHarness {
val encoder: PasswordEncoder = {
val encoderProps = new Properties()
- encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get around the config validation
+ encoderProps.put(ZkConfigs.ZK_CONNECT_PROP, "localhost:1234") // Get around the config validation
encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk secret to encrypt the
val encoderConfig = new KafkaConfig(encoderProps)
PasswordEncoder.encrypting(encoderConfig.passwordEncoderSecret.get,
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index ebb6ccf4453..affe6c3839c 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -27,6 +27,7 @@ import kafka.utils.TestUtils
import kafka.server.QuorumTestHarness
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
@@ -102,7 +103,7 @@ class ZooKeeperClientTest extends QuorumTestHarness {
// TLS connectivity itself is tested in system tests rather than here to avoid having to add TLS support
// to kafka.zk.EmbeddedZookeeper
val clientConfig = new ZKClientConfig()
- val propKey = KafkaConfig.ZkClientCnxnSocketProp
+ val propKey = ZkConfigs.ZK_CLIENT_CNXN_SOCKET_PROP
val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
val client = newZooKeeperClient(clientConfig = clientConfig)
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 75fcf3f24b6..f4d17ed0740 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ZkConfigs;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -176,7 +177,7 @@ public class MetadataRequestBenchmark {
private KafkaApis createKafkaApis() {
Properties kafkaProps = new Properties();
- kafkaProps.put(KafkaConfig$.MODULE$.ZkConnectProp(), "zk");
+ kafkaProps.put(ZkConfigs.ZK_CONNECT_PROP, "zk");
kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + "");
KafkaConfig config = new KafkaConfig(kafkaProps);
return new KafkaApisBuilder().
diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
index 0c7ca123b87..5b425c0dc49 100644
--- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java
+++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
@@ -44,16 +44,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class Defaults {
- /** ********* Zookeeper Configuration *********/
- public static final int ZK_SESSION_TIMEOUT_MS = 18000;
- public static final boolean ZK_ENABLE_SECURE_ACLS = false;
- public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10;
- public static final boolean ZK_SSL_CLIENT_ENABLE = false;
- public static final String ZK_SSL_PROTOCOL = "TLSv1.2";
- public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS";
- public static final boolean ZK_SSL_CRL_ENABLE = false;
- public static final boolean ZK_SSL_OCSP_ENABLE = false;
-
/** ********* General Configuration *********/
public static final boolean BROKER_ID_GENERATION_ENABLE = true;
public static final int MAX_RESERVED_BROKER_ID = 1000;
diff --git a/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java
new file mode 100644
index 00000000000..2ddd2ef2145
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class ZkConfigs {
+ /** ********* Zookeeper Configuration ***********/
+ public static final String ZK_CONNECT_PROP = "zookeeper.connect";
+ public static final String ZK_SESSION_TIMEOUT_MS_PROP = "zookeeper.session.timeout.ms";
+ public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = "zookeeper.connection.timeout.ms";
+ public static final String ZK_ENABLE_SECURE_ACLS_PROP = "zookeeper.set.acl";
+ public static final String ZK_MAX_IN_FLIGHT_REQUESTS_PROP = "zookeeper.max.in.flight.requests";
+ public static final String ZK_SSL_CLIENT_ENABLE_PROP = "zookeeper.ssl.client.enable";
+ public static final String ZK_CLIENT_CNXN_SOCKET_PROP = "zookeeper.clientCnxnSocket";
+ public static final String ZK_SSL_KEY_STORE_LOCATION_PROP = "zookeeper.ssl.keystore.location";
+ public static final String ZK_SSL_KEY_STORE_PASSWORD_PROP = "zookeeper.ssl.keystore.password";
+ public static final String ZK_SSL_KEY_STORE_TYPE_PROP = "zookeeper.ssl.keystore.type";
+ public static final String ZK_SSL_TRUST_STORE_LOCATION_PROP = "zookeeper.ssl.truststore.location";
+ public static final String ZK_SSL_TRUST_STORE_PASSWORD_PROP = "zookeeper.ssl.truststore.password";
+ public static final String ZK_SSL_TRUST_STORE_TYPE_PROP = "zookeeper.ssl.truststore.type";
+ public static final String ZK_SSL_PROTOCOL_PROP = "zookeeper.ssl.protocol";
+ public static final String ZK_SSL_ENABLED_PROTOCOLS_PROP = "zookeeper.ssl.enabled.protocols";
+ public static final String ZK_SSL_CIPHER_SUITES_PROP = "zookeeper.ssl.cipher.suites";
+ public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP = "zookeeper.ssl.endpoint.identification.algorithm";
+ public static final String ZK_SSL_CRL_ENABLE_PROP = "zookeeper.ssl.crl.enable";
+ public static final String ZK_SSL_OCSP_ENABLE_PROP = "zookeeper.ssl.ocsp.enable";
+
+ public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper connection string in the form hostname:port where host and port are the " +
+ "host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is " +
+ "down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.\n" +
+ "The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " +
+ "For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.";
+ public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session timeout";
+ public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZK_SESSION_TIMEOUT_MS_PROP + " is used";
+ public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use secure ACLs";
+ public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking.";
+ public static final String ZK_SSL_CLIENT_ENABLE_DOC;
+ public static final String ZK_CLIENT_CNXN_SOCKET_DOC;
+ public static final String ZK_SSL_KEY_STORE_LOCATION_DOC;
+ public static final String ZK_SSL_KEY_STORE_PASSWORD_DOC;
+ public static final String ZK_SSL_KEY_STORE_TYPE_DOC;
+ public static final String ZK_SSL_TRUST_STORE_LOCATION_DOC;
+ public static final String ZK_SSL_TRUST_STORE_PASSWORD_DOC;
+ public static final String ZK_SSL_TRUST_STORE_TYPE_DOC;
+ public static final String ZK_SSL_PROTOCOL_DOC;
+ public static final String ZK_SSL_ENABLED_PROTOCOLS_DOC;
+ public static final String ZK_SSL_CIPHER_SUITES_DOC;
+ public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC;
+ public static final String ZK_SSL_CRL_ENABLE_DOC;
+ public static final String ZK_SSL_OCSP_ENABLE_DOC;
+
+ // a map from the Kafka config to the corresponding ZooKeeper Java system property
+ public static final Map ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP;
+
+ public static final int ZK_SESSION_TIMEOUT_MS = 18000;
+ public static final boolean ZK_ENABLE_SECURE_ACLS = false;
+ public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10;
+ public static final boolean ZK_SSL_CLIENT_ENABLE = false;
+ public static final String ZK_SSL_PROTOCOL = "TLSv1.2";
+ public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS";
+ public static final boolean ZK_SSL_CRL_ENABLE = false;
+ public static final boolean ZK_SSL_OCSP_ENABLE = false;
+
+ // See ZKClientConfig.SECURE_CLIENT
+ private static final String SECURE_CLIENT = "zookeeper.client.secure";
+ // See ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET
+ private static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
+
+ static {
+ Map zkSslConfigToSystemPropertyMap = new HashMap<>();
+
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_CLIENT_ENABLE_PROP, SECURE_CLIENT);
+ zkSslConfigToSystemPropertyMap.put(ZK_CLIENT_CNXN_SOCKET_PROP, ZOOKEEPER_CLIENT_CNXN_SOCKET);
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_LOCATION_PROP, "zookeeper.ssl.keyStore.location");
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_PASSWORD_PROP, "zookeeper.ssl.keyStore.password");
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_TYPE_PROP, "zookeeper.ssl.keyStore.type");
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_LOCATION_PROP, "zookeeper.ssl.trustStore.location");
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_PASSWORD_PROP, "zookeeper.ssl.trustStore.password");
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_TYPE_PROP, "zookeeper.ssl.trustStore.type");
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_PROTOCOL_PROP, "zookeeper.ssl.protocol");
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENABLED_PROTOCOLS_PROP, "zookeeper.ssl.enabledProtocols");
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_CIPHER_SUITES_PROP, "zookeeper.ssl.ciphersuites");
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP, "zookeeper.ssl.hostnameVerification");
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_CRL_ENABLE_PROP, "zookeeper.ssl.crl");
+ zkSslConfigToSystemPropertyMap.put(ZK_SSL_OCSP_ENABLE_PROP, "zookeeper.ssl.ocsp");
+
+ ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP = Collections.unmodifiableMap(zkSslConfigToSystemPropertyMap);
+
+ ZK_SSL_CLIENT_ENABLE_DOC = "Set client to use TLS when connecting to ZooKeeper." +
+ " An explicit value overrides any value set via the zookeeper.client.secure system property (note the different name)." +
+ " Defaults to false if neither is set; when true, " + ZK_CLIENT_CNXN_SOCKET_PROP + " must be set (typically to org.apache.zookeeper.ClientCnxnSocketNetty); other values to set may include " +
+ ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.keySet().stream().filter(x -> !x.equals(ZK_SSL_CLIENT_ENABLE_PROP) && !x.equals(ZK_CLIENT_CNXN_SOCKET_PROP)).sorted().collect(Collectors.joining("", ", ", ""));
+ ZK_CLIENT_CNXN_SOCKET_DOC = "Typically set to org.apache.zookeeper.ClientCnxnSocketNetty when using TLS connectivity to ZooKeeper." +
+ " Overrides any explicit value set via the same-named " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_CLIENT_CNXN_SOCKET_PROP) + " system property.";
+ ZK_SSL_KEY_STORE_LOCATION_DOC = "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper." +
+ " Overrides any explicit value set via the " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_LOCATION_PROP) + " system property (note the camelCase).";
+ ZK_SSL_KEY_STORE_PASSWORD_DOC = "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
+ " Overrides any explicit value set via the " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_PASSWORD_PROP) + " system property (note the camelCase)." +
+ " Note that ZooKeeper does not support a key password different from the keystore password, so be sure to set the key password in the keystore to be identical to the keystore password; otherwise the connection attempt to Zookeeper will fail.";
+ ZK_SSL_KEY_STORE_TYPE_DOC = "Keystore type when using a client-side certificate with TLS connectivity to ZooKeeper." +
+ " Overrides any explicit value set via the " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_KEY_STORE_TYPE_PROP) + " system property (note the camelCase)." +
+ " The default value of null means the type will be auto-detected based on the filename extension of the keystore.";
+ ZK_SSL_TRUST_STORE_LOCATION_DOC = "Truststore location when using TLS connectivity to ZooKeeper." +
+ " Overrides any explicit value set via the " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_LOCATION_PROP) + " system property (note the camelCase).";
+ ZK_SSL_TRUST_STORE_PASSWORD_DOC = "Truststore password when using TLS connectivity to ZooKeeper." +
+ " Overrides any explicit value set via the " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_PASSWORD_PROP) + " system property (note the camelCase).";
+ ZK_SSL_TRUST_STORE_TYPE_DOC = "Truststore type when using TLS connectivity to ZooKeeper." +
+ " Overrides any explicit value set via the " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_TRUST_STORE_TYPE_PROP) + " system property (note the camelCase)." +
+ " The default value of null means the type will be auto-detected based on the filename extension of the truststore.";
+ ZK_SSL_PROTOCOL_DOC = "Specifies the protocol to be used in ZooKeeper TLS negotiation." +
+ " An explicit value overrides any value set via the same-named " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_PROTOCOL_PROP) + " system property.";
+ ZK_SSL_ENABLED_PROTOCOLS_DOC = "Specifies the enabled protocol(s) in ZooKeeper TLS negotiation (csv)." +
+ " Overrides any explicit value set via the " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_ENABLED_PROTOCOLS_PROP) + " system property (note the camelCase)." +
+ " The default value of null means the enabled protocol will be the value of the " + ZK_SSL_PROTOCOL_PROP + " configuration property.";
+ ZK_SSL_CIPHER_SUITES_DOC = "Specifies the enabled cipher suites to be used in ZooKeeper TLS negotiation (csv)." +
+ " Overrides any explicit value set via the " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_CIPHER_SUITES_PROP) + " system property (note the single word \"ciphersuites\")." +
+ " The default value of null means the list of enabled cipher suites is determined by the Java runtime being used.";
+ ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "Specifies whether to enable hostname verification in the ZooKeeper TLS negotiation process, with (case-insensitively) \"https\" meaning ZooKeeper hostname verification is enabled and an explicit blank value meaning it is disabled (disabling it is only recommended for testing purposes)." +
+ " An explicit value overrides any \"true\" or \"false\" value set via the " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP) + " system property (note the different name and values; true implies https and false implies blank).";
+ ZK_SSL_CRL_ENABLE_DOC = "Specifies whether to enable Certificate Revocation List in the ZooKeeper TLS protocols." +
+ " Overrides any explicit value set via the " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_CRL_ENABLE_PROP) + " system property (note the shorter name).";
+ ZK_SSL_OCSP_ENABLE_DOC = "Specifies whether to enable Online Certificate Status Protocol in the ZooKeeper TLS protocols." +
+ " Overrides any explicit value set via the " + ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(ZK_SSL_OCSP_ENABLE_PROP) + " system property (note the shorter name).";
+ }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 4232e1d74c9..89af1d5729e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.server.config.ConfigType;
+import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.test.TestCondition;
@@ -109,7 +110,7 @@ public class EmbeddedKafkaCluster {
zookeeper = new EmbeddedZookeeper();
log.debug("ZooKeeper instance is running at {}", zKConnectString());
- brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
+ brokerConfig.put(ZkConfigs.ZK_CONNECT_PROP, zKConnectString());
putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);
putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 7f945ffe06d..4b57d576ae1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.server.util.MockTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +91,7 @@ public class KafkaEmbedded {
effectiveConfig.put(KafkaConfig.AutoCreateTopicsEnableProp(), true);
effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 1000000);
effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), true);
- effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), 10000);
+ effectiveConfig.put(ZkConfigs.ZK_SESSION_TIMEOUT_MS_PROP, 10000);
effectiveConfig.putAll(initialConfig);
effectiveConfig.setProperty(KafkaConfig.LogDirProp(), logDir.getAbsolutePath());