KAFKA-15853 Refactor KafkaConfig to use PasswordEncoderConfigs (#15770)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Omnia Ibrahim 2024-04-21 17:47:57 +01:00 committed by GitHub
parent 98548c517d
commit 5e96e5c898
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 80 additions and 87 deletions

View File

@ -212,14 +212,14 @@ object ConfigCommand extends Logging {
} }
private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = { private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = {
encoderConfigs.get(PasswordEncoderConfigs.SECRET) encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)
val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.SECRET, val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG,
throw new IllegalArgumentException("Password encoder secret not specified")) throw new IllegalArgumentException("Password encoder secret not specified"))
PasswordEncoder.encrypting(new Password(encoderSecret), PasswordEncoder.encrypting(new Password(encoderSecret),
null, null,
encoderConfigs.getOrElse(PasswordEncoderConfigs.CIPHER_ALGORITHM, PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM), encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT),
encoderConfigs.get(PasswordEncoderConfigs.KEY_LENGTH).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_KEY_LENGTH), encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT),
encoderConfigs.get(PasswordEncoderConfigs.ITERATIONS).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_ITERATIONS)) encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT))
} }
/** /**
@ -239,8 +239,8 @@ object ConfigCommand extends Logging {
DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig) DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig)
val passwordConfigs = configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig) val passwordConfigs = configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
if (passwordConfigs.nonEmpty) { if (passwordConfigs.nonEmpty) {
require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.SECRET), require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
s"${PasswordEncoderConfigs.SECRET} must be specified to update $passwordConfigs." + s"${PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG} must be specified to update $passwordConfigs." +
" Other password encoder configs like cipher algorithm and iterations may also be specified" + " Other password encoder configs like cipher algorithm and iterations may also be specified" +
" to override the default encoding parameters. Password encoder configs will not be persisted" + " to override the default encoding parameters. Password encoder configs will not be persisted" +
" in ZooKeeper." " in ZooKeeper."

View File

@ -203,14 +203,6 @@ object KafkaConfig {
val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms" val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms"
val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms" val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"
/** ********* Password encryption configuration for dynamic configs *********/
val PasswordEncoderSecretProp = PasswordEncoderConfigs.SECRET
val PasswordEncoderOldSecretProp = PasswordEncoderConfigs.OLD_SECRET
val PasswordEncoderKeyFactoryAlgorithmProp = PasswordEncoderConfigs.KEYFACTORY_ALGORITHM
val PasswordEncoderCipherAlgorithmProp = PasswordEncoderConfigs.CIPHER_ALGORITHM
val PasswordEncoderKeyLengthProp = PasswordEncoderConfigs.KEY_LENGTH
val PasswordEncoderIterationsProp = PasswordEncoderConfigs.ITERATIONS
/** Internal Configurations **/ /** Internal Configurations **/
val UnstableApiVersionsEnableProp = "unstable.api.versions.enable" val UnstableApiVersionsEnableProp = "unstable.api.versions.enable"
val UnstableMetadataVersionsEnableProp = "unstable.metadata.versions.enable" val UnstableMetadataVersionsEnableProp = "unstable.metadata.versions.enable"
@ -419,17 +411,6 @@ object KafkaConfig {
val DelegationTokenExpiryTimeMsDoc = "The token validity time in milliseconds before the token needs to be renewed. Default value 1 day." val DelegationTokenExpiryTimeMsDoc = "The token validity time in milliseconds before the token needs to be renewed. Default value 1 day."
val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens." val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens."
/** ********* Password encryption configuration for dynamic configs *********/
val PasswordEncoderSecretDoc = "The secret used for encoding dynamically configured passwords for this broker."
val PasswordEncoderOldSecretDoc = "The old secret that was used for encoding dynamically configured passwords. " +
"This is required only when the secret is updated. If specified, all dynamically encoded passwords are " +
s"decoded using this old secret and re-encoded using $PasswordEncoderSecretProp when broker starts up."
val PasswordEncoderKeyFactoryAlgorithmDoc = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " +
"Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise."
val PasswordEncoderCipherAlgorithmDoc = "The Cipher algorithm used for encoding dynamically configured passwords."
val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords."
val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."
@nowarn("cat=deprecation") @nowarn("cat=deprecation")
val configDef = { val configDef = {
import ConfigDef.Importance._ import ConfigDef.Importance._
@ -765,12 +746,12 @@ object KafkaConfig {
.define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc) .define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc)
/** ********* Password encryption configuration for dynamic configs *********/ /** ********* Password encryption configuration for dynamic configs *********/
.define(PasswordEncoderSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderSecretDoc) .define(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_DOC)
.define(PasswordEncoderOldSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderOldSecretDoc) .define(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_DOC)
.define(PasswordEncoderKeyFactoryAlgorithmProp, STRING, null, LOW, PasswordEncoderKeyFactoryAlgorithmDoc) .define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, STRING, null, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC)
.define(PasswordEncoderCipherAlgorithmProp, STRING, Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM, LOW, PasswordEncoderCipherAlgorithmDoc) .define(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, STRING, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC)
.define(PasswordEncoderKeyLengthProp, INT, Defaults.PASSWORD_ENCODER_KEY_LENGTH, atLeast(8), LOW, PasswordEncoderKeyLengthDoc) .define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT, atLeast(8), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DOC)
.define(PasswordEncoderIterationsProp, INT, Defaults.PASSWORD_ENCODER_ITERATIONS, atLeast(1024), LOW, PasswordEncoderIterationsDoc) .define(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT, atLeast(1024), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DOC)
/** ********* Raft Quorum Configuration *********/ /** ********* Raft Quorum Configuration *********/
.define(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, Defaults.QUORUM_VOTERS, new RaftConfig.ControllerQuorumVotersValidator(), HIGH, RaftConfig.QUORUM_VOTERS_DOC) .define(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, Defaults.QUORUM_VOTERS, new RaftConfig.ControllerQuorumVotersValidator(), HIGH, RaftConfig.QUORUM_VOTERS_DOC)
@ -1349,12 +1330,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp) val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)
/** ********* Password encryption configuration for dynamic configs *********/ /** ********* Password encryption configuration for dynamic configs *********/
def passwordEncoderSecret = Option(getPassword(KafkaConfig.PasswordEncoderSecretProp)) def passwordEncoderSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG))
def passwordEncoderOldSecret = Option(getPassword(KafkaConfig.PasswordEncoderOldSecretProp)) def passwordEncoderOldSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG))
def passwordEncoderCipherAlgorithm = getString(KafkaConfig.PasswordEncoderCipherAlgorithmProp) def passwordEncoderCipherAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG)
def passwordEncoderKeyFactoryAlgorithm = getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp) def passwordEncoderKeyFactoryAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG)
def passwordEncoderKeyLength = getInt(KafkaConfig.PasswordEncoderKeyLengthProp) def passwordEncoderKeyLength = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG)
def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp) def passwordEncoderIterations = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG)
/** ********* Quota Configuration **************/ /** ********* Quota Configuration **************/
val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp) val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp)

View File

@ -18,12 +18,13 @@ package kafka.admin
import kafka.admin.ConfigCommand.ConfigCommandOptions import kafka.admin.ConfigCommand.ConfigCommandOptions
import kafka.cluster.{Broker, EndPoint} import kafka.cluster.{Broker, EndPoint}
import kafka.server.{KafkaConfig, QuorumTestHarness} import kafka.server.QuorumTestHarness
import kafka.utils.{Exit, Logging} import kafka.utils.{Exit, Logging}
import kafka.zk.{AdminZkClient, BrokerInfo} import kafka.zk.{AdminZkClient, BrokerInfo}
import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ZooKeeperInternals import org.apache.kafka.server.config.ZooKeeperInternals
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
@ -134,10 +135,10 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
// Password config update with encoder secret should succeed and encoded password must be stored in ZK // Password config update with encoder secret should succeed and encoded password must be stored in ZK
val configs = Map("listener.name.external.ssl.keystore.password" -> "secret", "log.cleaner.threads" -> "2") val configs = Map("listener.name.external.ssl.keystore.password" -> "secret", "log.cleaner.threads" -> "2")
val encoderConfigs = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret") val encoderConfigs = Map(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG -> "encoder-secret")
alterConfigWithZk(configs, Some(brokerId), encoderConfigs) alterConfigWithZk(configs, Some(brokerId), encoderConfigs)
val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId) val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId)
assertFalse(brokerConfigs.contains(KafkaConfig.PasswordEncoderSecretProp), "Encoder secret stored in ZooKeeper") assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper")
assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not encoded assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not encoded
val encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password") val encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password")
val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs) val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs)
@ -146,11 +147,11 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
// Password config update with overrides for encoder parameters // Password config update with overrides for encoder parameters
val configs2 = Map("listener.name.internal.ssl.keystore.password" -> "secret2") val configs2 = Map("listener.name.internal.ssl.keystore.password" -> "secret2")
val encoderConfigs2 = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret", val encoderConfigs2 = Map(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG -> "encoder-secret",
KafkaConfig.PasswordEncoderCipherAlgorithmProp -> "DES/CBC/PKCS5Padding", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG -> "DES/CBC/PKCS5Padding",
KafkaConfig.PasswordEncoderIterationsProp -> "1024", PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG -> "1024",
KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp -> "PBKDF2WithHmacSHA1", PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG -> "PBKDF2WithHmacSHA1",
KafkaConfig.PasswordEncoderKeyLengthProp -> "64") PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG -> "64")
alterConfigWithZk(configs2, Some(brokerId), encoderConfigs2) alterConfigWithZk(configs2, Some(brokerId), encoderConfigs2)
val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId) val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId)
val encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password") val encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password")

View File

@ -60,7 +60,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.security.PasswordEncoder import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
import org.apache.kafka.server.config.{ConfigType, KafkaSecurityConfigs, ZkConfigs} import org.apache.kafka.server.config.{ConfigType, KafkaSecurityConfigs, ZkConfigs}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs} import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
@ -137,7 +137,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
props.put(KafkaSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(",")) props.put(KafkaSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(","))
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "2000") // low value to test log rolling on config update props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "2000") // low value to test log rolling on config update
props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2") // greater than one to test reducing threads props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2") // greater than one to test reducing threads
props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "dynamic-config-secret")
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 1680000000.toString) props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 1680000000.toString)
props.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, 168.toString) props.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, 168.toString)
@ -1117,7 +1117,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties] val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties]
val config = server.config val config = server.config
val oldSecret = "old-dynamic-config-secret" val oldSecret = "old-dynamic-config-secret"
config.dynamicConfig.staticBrokerConfigs.put(KafkaConfig.PasswordEncoderOldSecretProp, oldSecret) config.dynamicConfig.staticBrokerConfigs.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, oldSecret)
val passwordConfigs = props.asScala.filter { case (k, _) => DynamicBrokerConfig.isPasswordConfig(k) } val passwordConfigs = props.asScala.filter { case (k, _) => DynamicBrokerConfig.isPasswordConfig(k) }
assertTrue(passwordConfigs.nonEmpty, "Password configs not found") assertTrue(passwordConfigs.nonEmpty, "Password configs not found")
val passwordDecoder = createPasswordEncoder(config, config.passwordEncoderSecret) val passwordDecoder = createPasswordEncoder(config, config.passwordEncoderSecret)
@ -1595,7 +1595,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val externalListenerPrefix = listenerPrefix(SecureExternal) val externalListenerPrefix = listenerPrefix(SecureExternal)
val sslStoreProps = new Properties val sslStoreProps = new Properties
sslStoreProps ++= securityProps(sslProperties, KEYSTORE_PROPS, externalListenerPrefix) sslStoreProps ++= securityProps(sslProperties, KEYSTORE_PROPS, externalListenerPrefix)
sslStoreProps.put(KafkaConfig.PasswordEncoderSecretProp, kafkaConfig.passwordEncoderSecret.map(_.value).orNull) sslStoreProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, kafkaConfig.passwordEncoderSecret.map(_.value).orNull)
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
val entityType = ConfigType.BROKER val entityType = ConfigType.BROKER
@ -1610,7 +1610,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
sslStoreProps.setProperty(configName, encodedValue) sslStoreProps.setProperty(configName, encodedValue)
} }
} }
sslStoreProps.remove(KafkaConfig.PasswordEncoderSecretProp) sslStoreProps.remove(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)
adminZkClient.changeConfigs(entityType, entityName, sslStoreProps) adminZkClient.changeConfigs(entityType, entityName, sslStoreProps)
val brokerProps = adminZkClient.fetchEntityConfig("brokers", kafkaConfig.brokerId.toString) val brokerProps = adminZkClient.fetchEntityConfig("brokers", kafkaConfig.brokerId.toString)

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SslConfigs} import org.apache.kafka.common.config.{ConfigException, SslConfigs}
import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.{Defaults, KafkaSecurityConfigs, ZkConfigs} import org.apache.kafka.server.config.{Defaults, KafkaSecurityConfigs, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@ -330,7 +331,7 @@ class DynamicBrokerConfigTest {
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = { private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = {
val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret") configProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "broker.secret")
val config = KafkaConfig(configProps) val config = KafkaConfig(configProps)
config.dynamicConfig.initialize(None, None) config.dynamicConfig.initialize(None, None)
@ -381,7 +382,7 @@ class DynamicBrokerConfigTest {
def testPasswordConfigEncryption(): Unit = { def testPasswordConfigEncryption(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val configWithoutSecret = KafkaConfig(props) val configWithoutSecret = KafkaConfig(props)
props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret") props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
val configWithSecret = KafkaConfig(props) val configWithSecret = KafkaConfig(props)
val dynamicProps = new Properties val dynamicProps = new Properties
dynamicProps.put(KafkaSecurityConfigs.SASL_JAAS_CONFIG, "myLoginModule required;") dynamicProps.put(KafkaSecurityConfigs.SASL_JAAS_CONFIG, "myLoginModule required;")
@ -402,7 +403,7 @@ class DynamicBrokerConfigTest {
def testPasswordConfigEncoderSecretChange(): Unit = { def testPasswordConfigEncoderSecretChange(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaSecurityConfigs.SASL_JAAS_CONFIG, "staticLoginModule required;") props.put(KafkaSecurityConfigs.SASL_JAAS_CONFIG, "staticLoginModule required;")
props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret") props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
val config = KafkaConfig(props) val config = KafkaConfig(props)
config.dynamicConfig.initialize(None, None) config.dynamicConfig.initialize(None, None)
val dynamicProps = new Properties val dynamicProps = new Properties
@ -421,14 +422,14 @@ class DynamicBrokerConfigTest {
assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
// New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig // New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig
props.put(KafkaConfig.PasswordEncoderSecretProp, "new-encoder-secret") props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "new-encoder-secret")
props.put(KafkaConfig.PasswordEncoderOldSecretProp, "config-encoder-secret") props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, "config-encoder-secret")
val newConfigWithNewAndOldSecret = KafkaConfig(props) val newConfigWithNewAndOldSecret = KafkaConfig(props)
newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
// New config with new secret alone should revert to static password config since dynamic config cannot be decoded // New config with new secret alone should revert to static password config since dynamic config cannot be decoded
props.put(KafkaConfig.PasswordEncoderSecretProp, "another-new-encoder-secret") props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "another-new-encoder-secret")
val newConfigWithNewSecret = KafkaConfig(props) val newConfigWithNewSecret = KafkaConfig(props)
newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps) newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value) assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)

View File

@ -39,9 +39,10 @@ import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
import org.apache.kafka.server.config.{KafkaSecurityConfigs, ServerTopicConfigSynonyms, ZkConfigs, ReplicationConfigs, ServerLogConfigs} import org.apache.kafka.server.config.{KafkaSecurityConfigs, ReplicationConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.function.Executable
@ -980,12 +981,12 @@ class KafkaConfigTest {
case KafkaSecurityConfigs.SECURITY_PROVIDER_CLASS_CONFIG => case KafkaSecurityConfigs.SECURITY_PROVIDER_CLASS_CONFIG =>
// Password encoder configs // Password encoder configs
case KafkaConfig.PasswordEncoderSecretProp => case PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG =>
case KafkaConfig.PasswordEncoderOldSecretProp => case PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG =>
case KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp => case PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG =>
case KafkaConfig.PasswordEncoderCipherAlgorithmProp => case PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG =>
case KafkaConfig.PasswordEncoderKeyLengthProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") case PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")
case KafkaConfig.PasswordEncoderIterationsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0") case PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")
//delegation token configs //delegation token configs
case KafkaConfig.DelegationTokenSecretKeyAliasProp => // ignore case KafkaConfig.DelegationTokenSecretKeyAliasProp => // ignore

View File

@ -20,7 +20,7 @@ import kafka.server.{KafkaConfig, QuorumTestHarness}
import kafka.zk.ZkMigrationClient import kafka.zk.ZkMigrationClient
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.security.PasswordEncoder import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
import org.apache.kafka.server.config.ZkConfigs import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{BeforeEach, TestInfo} import org.junit.jupiter.api.{BeforeEach, TestInfo}
@ -42,7 +42,7 @@ class ZkMigrationTestHarness extends QuorumTestHarness {
val encoder: PasswordEncoder = { val encoder: PasswordEncoder = {
val encoderProps = new Properties() val encoderProps = new Properties()
encoderProps.put(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:1234") // Get around the config validation encoderProps.put(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:1234") // Get around the config validation
encoderProps.put(KafkaConfig.PasswordEncoderSecretProp, SECRET) // Zk secret to encrypt the encoderProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, SECRET) // Zk secret to encrypt the
val encoderConfig = new KafkaConfig(encoderProps) val encoderConfig = new KafkaConfig(encoderProps)
PasswordEncoder.encrypting(encoderConfig.passwordEncoderSecret.get, PasswordEncoder.encrypting(encoderConfig.passwordEncoderSecret.get,
encoderConfig.passwordEncoderKeyFactoryAlgorithm, encoderConfig.passwordEncoderKeyFactoryAlgorithm,

View File

@ -18,13 +18,27 @@ package org.apache.kafka.security;
public class PasswordEncoderConfigs { public class PasswordEncoderConfigs {
public static final String SECRET = "password.encoder.secret"; public static final String PASSWORD_ENCODER_SECRET_CONFIG = "password.encoder.secret";
public static final String OLD_SECRET = "password.encoder.old.secret"; public static final String PASSWORD_ENCODER_SECRET_DOC = "The secret used for encoding dynamically configured passwords for this broker.";
public static final String KEYFACTORY_ALGORITHM = "password.encoder.keyfactory.algorithm";
public static final String CIPHER_ALGORITHM = "password.encoder.cipher.algorithm"; public static final String PASSWORD_ENCODER_OLD_SECRET_CONFIG = "password.encoder.old.secret";
public static final String DEFAULT_CIPHER_ALGORITHM = "AES/CBC/PKCS5Padding"; public static final String PASSWORD_ENCODER_OLD_SECRET_DOC = "The old secret that was used for encoding dynamically configured passwords. " +
public static final String KEY_LENGTH = "password.encoder.key.length"; "This is required only when the secret is updated. If specified, all dynamically encoded passwords are " +
public static final int DEFAULT_KEY_LENGTH = 128; "decoded using this old secret and re-encoded using " + PASSWORD_ENCODER_SECRET_CONFIG + " when broker starts up.";
public static final String ITERATIONS = "password.encoder.iterations";
public static final int DEFAULT_ITERATIONS = 4096; public static final String PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG = "password.encoder.keyfactory.algorithm";
public static final String PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " +
"Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise.";
public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG = "password.encoder.cipher.algorithm";
public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC = "The Cipher algorithm used for encoding dynamically configured passwords.";
public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT = "AES/CBC/PKCS5Padding";
public static final String PASSWORD_ENCODER_KEY_LENGTH_CONFIG = "password.encoder.key.length";
public static final String PASSWORD_ENCODER_KEY_LENGTH_DOC = "The key length used for encoding dynamically configured passwords.";
public static final int PASSWORD_ENCODER_KEY_LENGTH_DEFAULT = 128;
public static final String PASSWORD_ENCODER_ITERATIONS_CONFIG = "password.encoder.iterations";
public static final String PASSWORD_ENCODER_ITERATIONS_DOC = "The iteration count used for encoding dynamically configured passwords.";
public static final int PASSWORD_ENCODER_ITERATIONS_DEFAULT = 4096;
} }

View File

@ -36,9 +36,9 @@ class PasswordEncoderTest {
public void testEncodeDecode() throws GeneralSecurityException { public void testEncodeDecode() throws GeneralSecurityException {
PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"), PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
null, null,
PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT,
PasswordEncoderConfigs.DEFAULT_KEY_LENGTH, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT,
PasswordEncoderConfigs.DEFAULT_ITERATIONS); PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT);
String password = "test-password"; String password = "test-password";
String encoded = encoder.encode(new Password(password)); String encoded = encoder.encode(new Password(password));
Map<String, String> encodedMap = Csv.parseCsvMap(encoded); Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
@ -96,10 +96,10 @@ class PasswordEncoderTest {
verifyEncodeDecode(null, "AES/CBC/PKCS5Padding", 128); verifyEncodeDecode(null, "AES/CBC/PKCS5Padding", 128);
verifyEncodeDecode(null, "AES/CFB/PKCS5Padding", 128); verifyEncodeDecode(null, "AES/CFB/PKCS5Padding", 128);
verifyEncodeDecode(null, "AES/OFB/PKCS5Padding", 128); verifyEncodeDecode(null, "AES/OFB/PKCS5Padding", 128);
verifyEncodeDecode("PBKDF2WithHmacSHA1", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128); verifyEncodeDecode("PBKDF2WithHmacSHA1", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128);
verifyEncodeDecode(null, "AES/GCM/NoPadding", 128); verifyEncodeDecode(null, "AES/GCM/NoPadding", 128);
verifyEncodeDecode("PBKDF2WithHmacSHA256", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128); verifyEncodeDecode("PBKDF2WithHmacSHA256", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128);
verifyEncodeDecode("PBKDF2WithHmacSHA512", PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM, 128); verifyEncodeDecode("PBKDF2WithHmacSHA512", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128);
} }
private void verifyEncodeDecode(String keyFactoryAlg, String cipherAlg, int keyLength) throws GeneralSecurityException { private void verifyEncodeDecode(String keyFactoryAlg, String cipherAlg, int keyLength) throws GeneralSecurityException {
@ -107,7 +107,7 @@ class PasswordEncoderTest {
keyFactoryAlg, keyFactoryAlg,
cipherAlg, cipherAlg,
keyLength, keyLength,
PasswordEncoderConfigs.DEFAULT_ITERATIONS); PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT);
String password = "test-password"; String password = "test-password";
String encoded = encoder.encode(new Password(password)); String encoded = encoder.encode(new Password(password));
verifyEncodedPassword(encoder, password, encoded); verifyEncodedPassword(encoder, password, encoded);

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.raft.RaftConfig; import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.security.PasswordEncoderConfigs;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -119,10 +118,6 @@ public class Defaults {
public static final long DELEGATION_TOKEN_EXPIRY_TIME_MS = 24 * 60 * 60 * 1000L; public static final long DELEGATION_TOKEN_EXPIRY_TIME_MS = 24 * 60 * 60 * 1000L;
public static final long DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS = 1 * 60 * 60 * 1000L; public static final long DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS = 1 * 60 * 60 * 1000L;
/** ********* Password Encryption Configuration for Dynamic Configs *********/
public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM = PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM;
public static final int PASSWORD_ENCODER_KEY_LENGTH = PasswordEncoderConfigs.DEFAULT_KEY_LENGTH;
public static final int PASSWORD_ENCODER_ITERATIONS = PasswordEncoderConfigs.DEFAULT_ITERATIONS;
/** ********* Raft Quorum Configuration *********/ /** ********* Raft Quorum Configuration *********/
public static final List<String> QUORUM_VOTERS = RaftConfig.DEFAULT_QUORUM_VOTERS; public static final List<String> QUORUM_VOTERS = RaftConfig.DEFAULT_QUORUM_VOTERS;