KAFKA-18361 Remove PasswordEncoderConfigs (#18347)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2024-12-31 00:18:23 +08:00 committed by Chia-Ping Tsai
parent defababe38
commit 3575b41ba6
9 changed files with 8 additions and 301 deletions

View File

@ -213,11 +213,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val lock = new ReentrantReadWriteLock
private var metricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin] = _
private var currentConfig: KafkaConfig = _
private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
} else {
Some(PasswordEncoder.NOOP)
}
private val dynamicConfigPasswordEncoder = Some(PasswordEncoder.NOOP)
private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
currentConfig = new KafkaConfig(kafkaConfig.props, false)
@ -373,16 +369,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
})
}
private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
secret.map { secret =>
PasswordEncoder.encrypting(secret,
kafkaConfig.passwordEncoderKeyFactoryAlgorithm,
kafkaConfig.passwordEncoderCipherAlgorithm,
kafkaConfig.passwordEncoderKeyLength,
kafkaConfig.passwordEncoderIterations)
}
}
private def passwordEncoder: PasswordEncoder = {
dynamicConfigPasswordEncoder.getOrElse(throw new ConfigException("Password encoder secret not configured"))
}
@ -446,25 +432,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
// encoded using the current secret. Ignore any errors during decoding since old secret may not
// have been removed during broker restart.
private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = {
val props = persistentProps.clone().asInstanceOf[Properties]
if (props.asScala.keySet.exists(isPasswordConfig)) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
persistentProps.asScala.foreachEntry { (configName, value) =>
if (isPasswordConfig(configName) && value != null) {
val decoded = try {
Some(passwordDecoder.decode(value).value)
} catch {
case _: Exception =>
debug(s"Dynamic password config $configName could not be decoded using old secret, new secret will be used.")
None
}
decoded.foreach(value => props.put(configName, passwordEncoder.encode(new Password(value))))
}
}
adminZkClient.changeBrokerConfig(Some(kafkaConfig.brokerId), props)
}
}
props
persistentProps.clone().asInstanceOf[Properties]
}
/**

View File

@ -40,7 +40,6 @@ import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, Transacti
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
@ -591,14 +590,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val delegationTokenExpiryTimeMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG)
val delegationTokenExpiryCheckIntervalMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG)
/** ********* Password encryption configuration for dynamic configs *********/
def passwordEncoderSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG))
def passwordEncoderOldSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG))
def passwordEncoderCipherAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG)
def passwordEncoderKeyFactoryAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG)
def passwordEncoderKeyLength = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG)
def passwordEncoderIterations = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG)
/** ********* Fetch Configuration **************/
val maxIncrementalFetchSessionCacheSlots = getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG)
val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG)

View File

@ -61,7 +61,6 @@ import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
@ -182,7 +181,6 @@ public class ConfigCommandIntegrationTest {
configs.put("listener.name.external.ssl.keystore.password", "secret");
configs.put("log.cleaner.threads", "2");
// Password encoder configs
configs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
// Password config update at default cluster-level should fail
assertThrows(ExecutionException.class,

View File

@ -27,13 +27,11 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.utils.TestUtils
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SaslConfigs, SslConfigs}
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@ -369,7 +367,6 @@ class DynamicBrokerConfigTest {
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = {
val configProps = TestUtils.createBrokerConfig(0, null, port = 8181)
configProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "broker.secret")
val config = KafkaConfig(configProps)
config.dynamicConfig.initialize(None, None)
@ -416,61 +413,6 @@ class DynamicBrokerConfigTest {
}
}
@Test
def testPasswordConfigNotEncryption(): Unit = {
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
val configWithoutSecret = KafkaConfig(props)
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
val configWithSecret = KafkaConfig(props)
val dynamicProps = new Properties
val password = "myLoginModule required;"
dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password)
try {
configWithoutSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
} catch {
case _: ConfigException => // expected exception
}
val persistedProps = configWithSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
assertEquals(password, persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG))
}
@Test
def testPasswordConfigEncoderSecretChange(): Unit = {
val props = TestUtils.createBrokerConfig(0, null, port = 8181)
props.put(SaslConfigs.SASL_JAAS_CONFIG, "staticLoginModule required;")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
val config = KafkaConfig(props)
config.dynamicConfig.initialize(None, None)
val dynamicProps = new Properties
val password = "dynamicLoginModule required;"
dynamicProps.put(SaslConfigs.SASL_JAAS_CONFIG, password)
val persistedProps = config.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
assertEquals(password, persistedProps.getProperty(SaslConfigs.SASL_JAAS_CONFIG))
config.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals(password, config.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
// New config with same secret should use the dynamic password config
val newConfigWithSameSecret = KafkaConfig(props)
newConfigWithSameSecret.dynamicConfig.initialize(None, None)
newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals(password, newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
// New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "new-encoder-secret")
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, "config-encoder-secret")
val newConfigWithNewAndOldSecret = KafkaConfig(props)
newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals(password, newConfigWithSameSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
// New config with new secret alone should revert to static password config since dynamic config cannot be decoded
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "another-new-encoder-secret")
val newConfigWithNewSecret = KafkaConfig(props)
newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
}
@Test
def testDynamicListenerConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, null, port = 9092)

View File

@ -37,7 +37,6 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_8_2
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZkConfigs}
@ -1005,14 +1004,6 @@ class KafkaConfigTest {
// Security config
case SecurityConfig.SECURITY_PROVIDERS_CONFIG =>
// Password encoder configs
case PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG =>
case PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG =>
case PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG =>
case PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG =>
case PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")
case PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")
//delegation token configs
case DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG => // ignore
case DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")

View File

@ -63,6 +63,10 @@
</li>
<li>The <code>log.message.format.version</code> and <code>message.format.version</code> configs were removed.
</li>
<li>The password encoder related configs (<code>password.encoder.secret</code>, <code>password.encoder.old.secret</code>,
<code>password.encoder.keyfactory.algorithm</code>, <code>password.encoder.cipher.algorithm</code>,
<code>password.encoder.key.length</code>, and <code>password.encoder.iterations</code>) were removed.
</li>
<li>The function <code>onNewBatch</code> in <code>org.apache.kafka.clients.producer.Partitioner</code> class was removed.
</li>
</ul>

View File

@ -1,60 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.security;
import org.apache.kafka.common.config.ConfigDef;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public class PasswordEncoderConfigs {
public static final String PASSWORD_ENCODER_SECRET_CONFIG = "password.encoder.secret";
public static final String PASSWORD_ENCODER_SECRET_DOC = "The secret used for encoding dynamically configured passwords for this broker.";
public static final String PASSWORD_ENCODER_OLD_SECRET_CONFIG = "password.encoder.old.secret";
public static final String PASSWORD_ENCODER_OLD_SECRET_DOC = "The old secret that was used for encoding dynamically configured passwords. " +
"This is required only when the secret is updated. If specified, all dynamically encoded passwords are " +
"decoded using this old secret and re-encoded using " + PASSWORD_ENCODER_SECRET_CONFIG + " when broker starts up.";
public static final String PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG = "password.encoder.keyfactory.algorithm";
public static final String PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " +
"Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise.";
public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG = "password.encoder.cipher.algorithm";
public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC = "The Cipher algorithm used for encoding dynamically configured passwords.";
public static final String PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT = "AES/CBC/PKCS5Padding";
public static final String PASSWORD_ENCODER_KEY_LENGTH_CONFIG = "password.encoder.key.length";
public static final String PASSWORD_ENCODER_KEY_LENGTH_DOC = "The key length used for encoding dynamically configured passwords.";
public static final int PASSWORD_ENCODER_KEY_LENGTH_DEFAULT = 128;
public static final String PASSWORD_ENCODER_ITERATIONS_CONFIG = "password.encoder.iterations";
public static final String PASSWORD_ENCODER_ITERATIONS_DOC = "The iteration count used for encoding dynamically configured passwords.";
public static final int PASSWORD_ENCODER_ITERATIONS_DEFAULT = 4096;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, STRING, null, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, STRING, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT, atLeast(8), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DOC)
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT, atLeast(1024), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DOC);
}

View File

@ -1,125 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.security;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.server.util.Csv;
import org.junit.jupiter.api.Test;
import java.security.GeneralSecurityException;
import java.util.Map;
import javax.crypto.SecretKeyFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
class PasswordEncoderTest {
@Test
public void testEncodeDecode() throws GeneralSecurityException {
PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
null,
PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT,
PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT,
PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT);
String password = "test-password";
String encoded = encoder.encode(new Password(password));
Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
assertEquals("4096", encodedMap.get(PasswordEncoder.ITERATIONS));
assertEquals("128", encodedMap.get(PasswordEncoder.KEY_LENGTH));
String defaultKeyFactoryAlgorithm;
try {
SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512");
defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA512";
} catch (Exception e) {
defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA1";
}
assertEquals(defaultKeyFactoryAlgorithm, encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM));
assertEquals("AES/CBC/PKCS5Padding", encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM));
verifyEncodedPassword(encoder, password, encoded);
}
@Test
public void testEncoderConfigChange() throws GeneralSecurityException {
PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
"PBKDF2WithHmacSHA1",
"DES/CBC/PKCS5Padding",
64,
1024);
String password = "test-password";
String encoded = encoder.encode(new Password(password));
Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
assertEquals("1024", encodedMap.get(PasswordEncoder.ITERATIONS));
assertEquals("64", encodedMap.get(PasswordEncoder.KEY_LENGTH));
assertEquals("PBKDF2WithHmacSHA1", encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM));
assertEquals("DES/CBC/PKCS5Padding", encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM));
// Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
PasswordEncoder decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
"PBKDF2WithHmacSHA1",
"AES/CBC/PKCS5Padding",
128,
2048);
assertEquals(password, decoder.decode(encoded).value());
// Test that decoding fails if secret is altered
PasswordEncoder decoder2 = PasswordEncoder.encrypting(new Password("secret-2"),
"PBKDF2WithHmacSHA1",
"AES/CBC/PKCS5Padding",
128,
1024);
assertThrows(ConfigException.class, () -> decoder2.decode(encoded));
}
@Test
public void testEncodeDecodeAlgorithms() throws GeneralSecurityException {
verifyEncodeDecode(null, "DES/CBC/PKCS5Padding", 64);
verifyEncodeDecode(null, "DESede/CBC/PKCS5Padding", 192);
verifyEncodeDecode(null, "AES/CBC/PKCS5Padding", 128);
verifyEncodeDecode(null, "AES/CFB/PKCS5Padding", 128);
verifyEncodeDecode(null, "AES/OFB/PKCS5Padding", 128);
verifyEncodeDecode("PBKDF2WithHmacSHA1", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128);
verifyEncodeDecode(null, "AES/GCM/NoPadding", 128);
verifyEncodeDecode("PBKDF2WithHmacSHA256", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128);
verifyEncodeDecode("PBKDF2WithHmacSHA512", PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, 128);
}
private void verifyEncodeDecode(String keyFactoryAlg, String cipherAlg, int keyLength) throws GeneralSecurityException {
PasswordEncoder encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
keyFactoryAlg,
cipherAlg,
keyLength,
PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT);
String password = "test-password";
String encoded = encoder.encode(new Password(password));
verifyEncodedPassword(encoder, password, encoded);
}
private void verifyEncodedPassword(PasswordEncoder encoder, String password, String encoded) throws GeneralSecurityException {
Map<String, String> encodedMap = Csv.parseCsvMap(encoded);
assertEquals(String.valueOf(password.length()), encodedMap.get(PasswordEncoder.PASSWORD_LENGTH));
assertNotNull(PasswordEncoder.base64Decode(encodedMap.get("salt")), "Invalid salt");
assertNotNull(PasswordEncoder.base64Decode(encodedMap.get(PasswordEncoder.INITIALIZATION_VECTOR)), "Invalid encoding parameters");
assertNotNull(PasswordEncoder.base64Decode(encodedMap.get(PasswordEncoder.ENCRYPTED_PASSWORD)), "Invalid encoded password");
assertEquals(password, encoder.decode(encoded).value());
}
}

View File

@ -26,7 +26,6 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.security.PasswordEncoderConfigs;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.metrics.MetricConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
@ -64,8 +63,7 @@ public abstract class AbstractKafkaConfig extends AbstractConfig {
MetricConfigs.CONFIG_DEF,
QuotaConfig.CONFIG_DEF,
BrokerSecurityConfigs.CONFIG_DEF,
DelegationTokenManagerConfigs.CONFIG_DEF,
PasswordEncoderConfigs.CONFIG_DEF
DelegationTokenManagerConfigs.CONFIG_DEF
));
public AbstractKafkaConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {