mirror of https://github.com/apache/kafka.git
KAFKA-10692: Add delegation.token.secret.key, deprecate ...master.key (#9623)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
parent
1625984149
commit
91679f247a
|
@ -171,7 +171,7 @@ class DelegationTokenManager(val config: KafkaConfig,
|
|||
type DescribeResponseCallback = (Errors, List[DelegationToken]) => Unit
|
||||
|
||||
val secretKey = {
|
||||
val keyBytes = if (config.tokenAuthEnabled) config.delegationTokenMasterKey.value.getBytes(StandardCharsets.UTF_8) else null
|
||||
val keyBytes = if (config.tokenAuthEnabled) config.delegationTokenSecretKey.value.getBytes(StandardCharsets.UTF_8) else null
|
||||
if (keyBytes == null || keyBytes.length == 0) null
|
||||
else
|
||||
createSecretKey(keyBytes)
|
||||
|
|
|
@ -558,7 +558,8 @@ object KafkaConfig {
|
|||
val SaslLoginRefreshBufferSecondsProp = SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS
|
||||
|
||||
/** ********* Delegation Token Configuration ****************/
|
||||
val DelegationTokenMasterKeyProp = "delegation.token.master.key"
|
||||
val DelegationTokenSecretKeyAliasProp = "delegation.token.master.key"
|
||||
val DelegationTokenSecretKeyProp = "delegation.token.secret.key"
|
||||
val DelegationTokenMaxLifeTimeProp = "delegation.token.max.lifetime.ms"
|
||||
val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms"
|
||||
val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"
|
||||
|
@ -966,7 +967,8 @@ object KafkaConfig {
|
|||
val SaslLoginRefreshBufferSecondsDoc = SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
|
||||
|
||||
/** ********* Delegation Token Configuration ****************/
|
||||
val DelegationTokenMasterKeyDoc = "Master/secret key to generate and verify delegation tokens. Same key must be configured across all the brokers. " +
|
||||
val DelegationTokenSecretKeyAliasDoc = s"DEPRECATED: An alias for $DelegationTokenSecretKeyProp, which should be used instead of this config."
|
||||
val DelegationTokenSecretKeyDoc = "Secret key to generate and verify delegation tokens. The same key must be configured across all the brokers. " +
|
||||
" If the key is not set or set to empty string, brokers will disable the delegation token support."
|
||||
val DelegationTokenMaxLifeTimeDoc = "The token has a maximum lifetime beyond which it cannot be renewed anymore. Default value 7 days."
|
||||
val DelegationTokenExpiryTimeMsDoc = "The token validity time in miliseconds before the token needs to be renewed. Default value 1 day."
|
||||
|
@ -1240,7 +1242,8 @@ object KafkaConfig {
|
|||
.define(SaslLoginRefreshMinPeriodSecondsProp, SHORT, Defaults.SaslLoginRefreshMinPeriodSeconds, MEDIUM, SaslLoginRefreshMinPeriodSecondsDoc)
|
||||
.define(SaslLoginRefreshBufferSecondsProp, SHORT, Defaults.SaslLoginRefreshBufferSeconds, MEDIUM, SaslLoginRefreshBufferSecondsDoc)
|
||||
/** ********* Delegation Token Configuration ****************/
|
||||
.define(DelegationTokenMasterKeyProp, PASSWORD, null, MEDIUM, DelegationTokenMasterKeyDoc)
|
||||
.define(DelegationTokenSecretKeyAliasProp, PASSWORD, null, MEDIUM, DelegationTokenSecretKeyAliasDoc)
|
||||
.define(DelegationTokenSecretKeyProp, PASSWORD, null, MEDIUM, DelegationTokenSecretKeyDoc)
|
||||
.define(DelegationTokenMaxLifeTimeProp, LONG, Defaults.DelegationTokenMaxLifeTimeMsDefault, atLeast(1), MEDIUM, DelegationTokenMaxLifeTimeDoc)
|
||||
.define(DelegationTokenExpiryTimeMsProp, LONG, Defaults.DelegationTokenExpiryTimeMsDefault, atLeast(1), MEDIUM, DelegationTokenExpiryTimeMsDoc)
|
||||
.define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DelegationTokenExpiryCheckIntervalMsDefault, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc)
|
||||
|
@ -1619,8 +1622,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
|
|||
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1
|
||||
|
||||
/** ********* DelegationToken Configuration **************/
|
||||
val delegationTokenMasterKey = getPassword(KafkaConfig.DelegationTokenMasterKeyProp)
|
||||
val tokenAuthEnabled = (delegationTokenMasterKey != null && !delegationTokenMasterKey.value.isEmpty)
|
||||
val delegationTokenSecretKey = Option(getPassword(KafkaConfig.DelegationTokenSecretKeyProp))
|
||||
.getOrElse(getPassword(KafkaConfig.DelegationTokenSecretKeyAliasProp))
|
||||
val tokenAuthEnabled = (delegationTokenSecretKey != null && !delegationTokenSecretKey.value.isEmpty)
|
||||
val delegationTokenMaxLifeMs = getLong(KafkaConfig.DelegationTokenMaxLifeTimeProp)
|
||||
val delegationTokenExpiryTimeMs = getLong(KafkaConfig.DelegationTokenExpiryTimeMsProp)
|
||||
val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)
|
||||
|
|
|
@ -49,7 +49,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
|
|||
|
||||
private val privilegedAdminClientConfig = new Properties()
|
||||
|
||||
this.serverConfig.setProperty(KafkaConfig.DelegationTokenMasterKeyProp, "testKey")
|
||||
this.serverConfig.setProperty(KafkaConfig.DelegationTokenSecretKeyProp, "testKey")
|
||||
|
||||
override def configureSecurityBeforeServersStart(): Unit = {
|
||||
super.configureSecurityBeforeServersStart()
|
||||
|
|
|
@ -53,7 +53,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
val renewer = List(SecurityUtils.parseKafkaPrincipal("User:renewer1"))
|
||||
val tokenManagers = Buffer[DelegationTokenManager]()
|
||||
|
||||
val masterKey = "masterKey"
|
||||
val secretKey = "secretKey"
|
||||
val maxLifeTimeMsDefault = Defaults.DelegationTokenMaxLifeTimeMsDefault
|
||||
val renewTimeMsDefault = Defaults.DelegationTokenExpiryTimeMsDefault
|
||||
var tokenCache: DelegationTokenCache = null
|
||||
|
@ -68,7 +68,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
super.setUp()
|
||||
props = TestUtils.createBrokerConfig(0, zkConnect, enableToken = true)
|
||||
props.put(KafkaConfig.SaslEnabledMechanismsProp, ScramMechanism.mechanismNames().asScala.mkString(","))
|
||||
props.put(KafkaConfig.DelegationTokenMasterKeyProp, masterKey)
|
||||
props.put(KafkaConfig.DelegationTokenSecretKeyProp, secretKey)
|
||||
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames())
|
||||
}
|
||||
|
||||
|
@ -104,7 +104,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
|
||||
val issueTime = time.milliseconds
|
||||
val tokenId = createTokenResult.tokenId
|
||||
val password = DelegationTokenManager.createHmac(tokenId, masterKey)
|
||||
val password = DelegationTokenManager.createHmac(tokenId, secretKey)
|
||||
assertEquals(CreateTokenResult(issueTime, issueTime + renewTimeMsDefault, issueTime + maxLifeTimeMsDefault, tokenId, password, Errors.NONE), createTokenResult)
|
||||
|
||||
val token = tokenManager.getToken(tokenId)
|
||||
|
@ -122,7 +122,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
val issueTime = time.milliseconds
|
||||
val maxLifeTime = issueTime + maxLifeTimeMsDefault
|
||||
val tokenId = createTokenResult.tokenId
|
||||
val password = DelegationTokenManager.createHmac(tokenId, masterKey)
|
||||
val password = DelegationTokenManager.createHmac(tokenId, secretKey)
|
||||
assertEquals(CreateTokenResult(issueTime, issueTime + renewTimeMsDefault, maxLifeTime, tokenId, password, Errors.NONE), createTokenResult)
|
||||
|
||||
//try renewing non-existing token
|
||||
|
@ -169,7 +169,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
|
||||
val issueTime = time.milliseconds
|
||||
val tokenId = createTokenResult.tokenId
|
||||
val password = DelegationTokenManager.createHmac(tokenId, masterKey)
|
||||
val password = DelegationTokenManager.createHmac(tokenId, secretKey)
|
||||
assertEquals(CreateTokenResult(issueTime, issueTime + renewTimeMsDefault, issueTime + maxLifeTimeMsDefault, tokenId, password, Errors.NONE), createTokenResult)
|
||||
|
||||
//try expire non-existing token
|
||||
|
@ -204,7 +204,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
|
|||
tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
|
||||
val issueTime = time.milliseconds
|
||||
val tokenId = createTokenResult.tokenId
|
||||
val password = DelegationTokenManager.createHmac(tokenId, masterKey)
|
||||
val password = DelegationTokenManager.createHmac(tokenId, secretKey)
|
||||
assertEquals(CreateTokenResult(issueTime, issueTime + renewTimeMsDefault, issueTime + maxLifeTimeMsDefault, tokenId, password, Errors.NONE), createTokenResult)
|
||||
|
||||
// expire the token immediately
|
||||
|
|
|
@ -785,7 +785,8 @@ class KafkaConfigTest {
|
|||
case KafkaConfig.PasswordEncoderIterationsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1", "0")
|
||||
|
||||
//delegation token configs
|
||||
case KafkaConfig.DelegationTokenMasterKeyProp => // ignore
|
||||
case KafkaConfig.DelegationTokenSecretKeyAliasProp => // ignore
|
||||
case KafkaConfig.DelegationTokenSecretKeyProp => // ignore
|
||||
case KafkaConfig.DelegationTokenMaxLifeTimeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
|
||||
|
@ -925,7 +926,7 @@ class KafkaConfigTest {
|
|||
assertEquals(24 * 60L * 60L * 1000L, config.delegationTokenExpiryTimeMs)
|
||||
assertEquals(1 * 60L * 1000L * 60, config.delegationTokenExpiryCheckIntervalMs)
|
||||
|
||||
defaults.put(KafkaConfig.DelegationTokenMasterKeyProp, "1234567890")
|
||||
defaults.put(KafkaConfig.DelegationTokenSecretKeyProp, "1234567890")
|
||||
val config1 = KafkaConfig.fromProps(defaults)
|
||||
assertEquals(true, config1.tokenAuthEnabled)
|
||||
}
|
||||
|
|
|
@ -313,7 +313,7 @@ object TestUtils extends Logging {
|
|||
}
|
||||
|
||||
if (enableToken)
|
||||
props.put(KafkaConfig.DelegationTokenMasterKeyProp, "masterkey")
|
||||
props.put(KafkaConfig.DelegationTokenSecretKeyProp, "secretkey")
|
||||
|
||||
props.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
|
||||
props.put(KafkaConfig.DefaultReplicationFactorProp, defaultReplicationFactor.toString)
|
||||
|
|
|
@ -1059,12 +1059,12 @@ keyUsage = digitalSignature, keyEncipherment</code></pre>
|
|||
|
||||
<ol>
|
||||
<li><h5 class="anchor-heading"><a id="security_token_management" class="anchor-link"></a><a href="#security_token_management">Token Management</a></h5>
|
||||
<p> A master key/secret is used to generate and verify delegation tokens. This is supplied using config
|
||||
option <tt>delegation.token.master.key</tt>. Same secret key must be configured across all the brokers.
|
||||
<p> A secret is used to generate and verify delegation tokens. This is supplied using config
|
||||
option <tt>delegation.token.secret.key</tt>. The same secret key must be configured across all the brokers.
|
||||
If the secret is not set or set to empty string, brokers will disable the delegation token authentication.</p>
|
||||
|
||||
<p>In current implementation, token details are stored in Zookeeper and is suitable for use in Kafka installations where
|
||||
Zookeeper is on a private network. Also currently, master key/secret is stored as plain text in server.properties
|
||||
<p>In the current implementation, token details are stored in Zookeeper and is suitable for use in Kafka installations where
|
||||
Zookeeper is on a private network. Also currently, this secret is stored as plain text in the server.properties
|
||||
config file. We intend to make these configurable in a future Kafka release.</p>
|
||||
|
||||
<p>A token has a current life, and a maximum renewable life. By default, tokens must be renewed once every 24 hours
|
||||
|
|
|
@ -55,7 +55,7 @@ OFFSETS_TOPIC_NUM_PARTITIONS = "offsets.topic.num.partitions"
|
|||
|
||||
DELEGATION_TOKEN_MAX_LIFETIME_MS="delegation.token.max.lifetime.ms"
|
||||
DELEGATION_TOKEN_EXPIRY_TIME_MS="delegation.token.expiry.time.ms"
|
||||
DELEGATION_TOKEN_MASTER_KEY="delegation.token.master.key"
|
||||
DELEGATION_TOKEN_SECRET_KEY="delegation.token.secret.key"
|
||||
SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms"
|
||||
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ class DelegationTokenTest(Test):
|
|||
server_prop_overides=[
|
||||
[config_property.DELEGATION_TOKEN_MAX_LIFETIME_MS, "604800000"],
|
||||
[config_property.DELEGATION_TOKEN_EXPIRY_TIME_MS, "86400000"],
|
||||
[config_property.DELEGATION_TOKEN_MASTER_KEY, "test12345"],
|
||||
[config_property.DELEGATION_TOKEN_SECRET_KEY, "test12345"],
|
||||
[config_property.SASL_ENABLED_MECHANISMS, "GSSAPI,SCRAM-SHA-256"]
|
||||
])
|
||||
self.jaas_deleg_conf_path = "/tmp/jaas_deleg.conf"
|
||||
|
|
Loading…
Reference in New Issue