KAFKA-19113: Migrate DelegationTokenManager to server module (#19424)

1. Migrate DelegationTokenManager to server module.
2. Rewrite DelegationTokenManager in Java.
3. Move DelegationTokenManagerConfigs out of KafkaConfig.

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
PoAn Yang 2025-04-14 22:49:45 +08:00 committed by GitHub
parent dced8bfb38
commit 8827ce4701
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 266 additions and 182 deletions

View File

@ -82,6 +82,7 @@
<allow pkg="org.apache.kafka.raft" /> <allow pkg="org.apache.kafka.raft" />
<subpackage name="server"> <subpackage name="server">
<allow pkg="javax.crypto" />
<allow pkg="org.apache.kafka.server" /> <allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.image" /> <allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.storage.internals.log" /> <allow pkg="org.apache.kafka.storage.internals.log" />

View File

@ -21,7 +21,6 @@ import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel; import kafka.network.RequestChannel;
import kafka.server.ApiVersionManager; import kafka.server.ApiVersionManager;
import kafka.server.AutoTopicCreationManager; import kafka.server.AutoTopicCreationManager;
import kafka.server.DelegationTokenManager;
import kafka.server.FetchManager; import kafka.server.FetchManager;
import kafka.server.ForwardingManager; import kafka.server.ForwardingManager;
import kafka.server.KafkaApis; import kafka.server.KafkaApis;
@ -38,6 +37,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator;
import org.apache.kafka.metadata.ConfigRepository; import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.metadata.MetadataCache; import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.ClientMetricsManager; import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.DelegationTokenManager;
import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

View File

@ -46,7 +46,7 @@ import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.security.CredentialProvider import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition} import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.config.ConfigType import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics} import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
@ -54,7 +54,7 @@ import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
import org.apache.kafka.server.share.session.ShareSessionCache import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper} import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler} import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DelayedActionQueue, ProcessRole} import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DelayedActionQueue, DelegationTokenManager, ProcessRole}
import org.apache.kafka.storage.internals.log.LogDirFailureChannel import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@ -355,7 +355,7 @@ class BrokerServer(
) )
/* start token manager */ /* start token manager */
tokenManager = new DelegationTokenManager(config, tokenCache, time) tokenManager = new DelegationTokenManager(new DelegationTokenManagerConfigs(config), tokenCache)
/* initializing the groupConfigManager */ /* initializing the groupConfigManager */
groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig)) groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig))

View File

@ -56,7 +56,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply} import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal} import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
@ -970,7 +970,7 @@ class ControllerApis(
new RenewDelegationTokenResponseData() new RenewDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs) .setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code) .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp))) .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
CompletableFuture.completedFuture[Unit](()) CompletableFuture.completedFuture[Unit](())
} else { } else {
val context = new ControllerRequestContext( val context = new ControllerRequestContext(
@ -994,7 +994,7 @@ class ControllerApis(
new ExpireDelegationTokenResponseData() new ExpireDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs) .setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code) .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp))) .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
CompletableFuture.completedFuture[Unit](()) CompletableFuture.completedFuture[Unit](())
} else { } else {
val context = new ControllerRequestContext( val context = new ControllerRequestContext(

View File

@ -40,11 +40,11 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher} import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG} import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager} import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.config.ConfigType import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@ -206,9 +206,10 @@ class ControllerServer(
QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled), QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled),
controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava) controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava)
val delegationTokenManagerConfigs = new DelegationTokenManagerConfigs(config)
val delegationTokenKeyString = { val delegationTokenKeyString = {
if (config.tokenAuthEnabled) { if (delegationTokenManagerConfigs.tokenAuthEnabled) {
config.delegationTokenSecretKey.value delegationTokenManagerConfigs.delegationTokenSecretKey.value
} else { } else {
null null
} }
@ -247,9 +248,9 @@ class ControllerServer(
setNonFatalFaultHandler(sharedServer.nonFatalQuorumControllerFaultHandler). setNonFatalFaultHandler(sharedServer.nonFatalQuorumControllerFaultHandler).
setDelegationTokenCache(tokenCache). setDelegationTokenCache(tokenCache).
setDelegationTokenSecretKey(delegationTokenKeyString). setDelegationTokenSecretKey(delegationTokenKeyString).
setDelegationTokenMaxLifeMs(config.delegationTokenMaxLifeMs). setDelegationTokenMaxLifeMs(delegationTokenManagerConfigs.delegationTokenMaxLifeMs).
setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs). setDelegationTokenExpiryTimeMs(delegationTokenManagerConfigs.delegationTokenExpiryTimeMs).
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs). setDelegationTokenExpiryCheckIntervalMs(delegationTokenManagerConfigs.delegationTokenExpiryCheckIntervalMs).
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs). setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
setInterBrokerListenerName(config.interBrokerListenerName.value()). setInterBrokerListenerName(config.interBrokerListenerName.value()).
setControllerPerformanceSamplePeriodMs(config.controllerPerformanceSamplePeriodMs). setControllerPerformanceSamplePeriodMs(config.controllerPerformanceSamplePeriodMs).
@ -361,7 +362,7 @@ class ControllerServer(
config, config,
sharedServer.metadataPublishingFaultHandler, sharedServer.metadataPublishingFaultHandler,
"controller", "controller",
new DelegationTokenManager(config, tokenCache, time) new DelegationTokenManager(delegationTokenManagerConfigs, tokenCache)
)) ))
// Set up the metrics publisher. // Set up the metrics publisher.

View File

@ -1,143 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.nio.charset.StandardCharsets
import java.security.InvalidKeyException
import javax.crypto.spec.SecretKeySpec
import javax.crypto.{Mac, SecretKey}
import kafka.utils.Logging
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.{ScramFormatter, ScramMechanism}
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.Time
import java.util
import java.util.stream.Collectors
import scala.jdk.CollectionConverters._
import scala.collection.mutable
object DelegationTokenManager {
private val DefaultHmacAlgorithm = "HmacSHA512"
val ErrorTimestamp = -1
/**
* Convert the byte[] to a secret key
* @param keybytes the byte[] to create the secret key from
* @return the secret key
*/
private def createSecretKey(keybytes: Array[Byte]) : SecretKey = {
new SecretKeySpec(keybytes, DefaultHmacAlgorithm)
}
/**
* Compute HMAC of the identifier using the secret key
* @param tokenId the bytes of the identifier
* @param secretKey the secret key
* @return String of the generated hmac
*/
def createHmac(tokenId: String, secretKey: SecretKey) : Array[Byte] = {
val mac = Mac.getInstance(DefaultHmacAlgorithm)
try
mac.init(secretKey)
catch {
case ike: InvalidKeyException => throw new IllegalArgumentException("Invalid key to HMAC computation", ike)
}
mac.doFinal(tokenId.getBytes(StandardCharsets.UTF_8))
}
def filterToken(requesterPrincipal: KafkaPrincipal, owners : Option[List[KafkaPrincipal]], token: TokenInformation,
authorizeToken: String => Boolean, authorizeRequester: KafkaPrincipal => Boolean) : Boolean = {
val allow =
//exclude tokens which are not requested
if (owners.isDefined && !owners.get.exists(owner => token.ownerOrRenewer(owner))) {
false
//Owners and the renewers can describe their own tokens
} else if (token.ownerOrRenewer(requesterPrincipal)) {
true
// Check permission for non-owned tokens
} else if (authorizeToken(token.tokenId) || authorizeRequester(token.owner)) {
true
}
else {
false
}
allow
}
}
class DelegationTokenManager(val config: KafkaConfig,
val tokenCache: DelegationTokenCache,
val time: Time) extends Logging {
this.logIdent = s"[Token Manager on Node ${config.brokerId}]: "
import DelegationTokenManager._
val secretKey: SecretKey = {
val keyBytes = if (config.tokenAuthEnabled) config.delegationTokenSecretKey.value.getBytes(StandardCharsets.UTF_8) else null
if (keyBytes == null || keyBytes.isEmpty) null
else
createSecretKey(keyBytes)
}
/**
* @param hmacString
*/
private def prepareScramCredentials(hmacString: String) : Map[String, ScramCredential] = {
val scramCredentialMap = mutable.Map[String, ScramCredential]()
def scramCredential(mechanism: ScramMechanism): ScramCredential = {
new ScramFormatter(mechanism).generateCredential(hmacString, mechanism.minIterations)
}
for (mechanism <- ScramMechanism.values)
scramCredentialMap(mechanism.mechanismName) = scramCredential(mechanism)
scramCredentialMap.toMap
}
/**
* @param token
*/
def updateToken(token: DelegationToken): Unit = {
val hmacString = token.hmacAsBase64String
val scramCredentialMap = prepareScramCredentials(hmacString)
tokenCache.updateCache(token, scramCredentialMap.asJava)
}
def getDelegationToken(tokenInfo: TokenInformation): DelegationToken = {
val hmac = createHmac(tokenInfo.tokenId, secretKey)
new DelegationToken(tokenInfo, hmac)
}
/**
*
* @param tokenId
*/
def removeToken(tokenId: String): Unit = {
tokenCache.removeCache(tokenId)
}
def getTokens(filterToken: TokenInformation => Boolean): util.List[DelegationToken] = {
tokenCache.tokens.stream().filter(token => filterToken(token)).map(getDelegationToken).collect(Collectors.toList[DelegationToken])
}
}

View File

@ -60,9 +60,10 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.{Group, GroupConfig, GroupConfigManager, GroupCoordinator} import org.apache.kafka.coordinator.group.{Group, GroupConfig, GroupConfigManager, GroupCoordinator}
import org.apache.kafka.coordinator.share.ShareCoordinator import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache} import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
import org.apache.kafka.server.{ClientMetricsManager, ProcessRole} import org.apache.kafka.server.{ClientMetricsManager, DelegationTokenManager, ProcessRole}
import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, RequestLocal, TransactionVersion} import org.apache.kafka.server.common.{GroupVersion, RequestLocal, TransactionVersion}
import org.apache.kafka.server.config.DelegationTokenManagerConfigs
import org.apache.kafka.server.share.context.ShareFetchContext import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, SharePartitionKey} import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, SharePartitionKey}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
@ -2201,7 +2202,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new ExpireDelegationTokenResponseData() new ExpireDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs) .setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code) .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp))) .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
} else { } else {
forwardToController(request) forwardToController(request)
} }
@ -2214,7 +2215,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new RenewDelegationTokenResponseData() new RenewDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs) .setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code) .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
.setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp))) .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
} else { } else {
forwardToController(request) forwardToController(request)
} }
@ -2233,7 +2234,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!allowTokenRequests(request)) if (!allowTokenRequests(request))
sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, Collections.emptyList) sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, Collections.emptyList)
else if (!config.tokenAuthEnabled) else if (!new DelegationTokenManagerConfigs(config).tokenAuthEnabled)
sendResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, Collections.emptyList) sendResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, Collections.emptyList)
else { else {
val requestPrincipal = request.context.principal val requestPrincipal = request.context.principal
@ -2242,10 +2243,10 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseCallback(Errors.NONE, Collections.emptyList) sendResponseCallback(Errors.NONE, Collections.emptyList)
} }
else { else {
val owners = if (describeTokenRequest.data.owners == null) val owners: Optional[util.List[KafkaPrincipal]] = if (describeTokenRequest.data.owners == null)
None Optional.empty()
else else
Some(describeTokenRequest.data.owners.asScala.map(p => new KafkaPrincipal(p.principalType(), p.principalName)).toList) Optional.of(describeTokenRequest.data.owners.stream().map(p => new KafkaPrincipal(p.principalType(), p.principalName)).toList)
def authorizeToken(tokenId: String) = authHelper.authorize(request.context, DESCRIBE, DELEGATION_TOKEN, tokenId) def authorizeToken(tokenId: String) = authHelper.authorize(request.context, DESCRIBE, DELEGATION_TOKEN, tokenId)
def authorizeRequester(owner: KafkaPrincipal) = authHelper.authorize(request.context, DESCRIBE_TOKENS, USER, owner.toString) def authorizeRequester(owner: KafkaPrincipal) = authHelper.authorize(request.context, DESCRIBE_TOKENS, USER, owner.toString)
def eligible(token: TokenInformation) = DelegationTokenManager def eligible(token: TokenInformation) = DelegationTokenManager

View File

@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.server.util.Csv import org.apache.kafka.server.util.Csv
@ -429,13 +429,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2 def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
def saslMechanismInterBrokerProtocol = getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG) def saslMechanismInterBrokerProtocol = getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG)
/** ********* DelegationToken Configuration **************/
val delegationTokenSecretKey = getPassword(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG)
val tokenAuthEnabled = delegationTokenSecretKey != null && delegationTokenSecretKey.value.nonEmpty
val delegationTokenMaxLifeMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG)
val delegationTokenExpiryTimeMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG)
val delegationTokenExpiryCheckIntervalMs = getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG)
/** ********* Fetch Configuration **************/ /** ********* Fetch Configuration **************/
val maxIncrementalFetchSessionCacheSlots = getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG) val maxIncrementalFetchSessionCacheSlots = getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG)
val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG) val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG)

View File

@ -17,11 +17,11 @@
package kafka.server.metadata package kafka.server.metadata
import kafka.server.DelegationTokenManager
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.DelegationTokenManager
import org.apache.kafka.server.fault.FaultHandler import org.apache.kafka.server.fault.FaultHandler

View File

@ -1173,14 +1173,6 @@ class KafkaConfigTest {
assertEquals(123L, config.logFlushIntervalMs) assertEquals(123L, config.logFlushIntervalMs)
assertEquals(CompressionType.SNAPPY, config.groupCoordinatorConfig.offsetTopicCompressionType) assertEquals(CompressionType.SNAPPY, config.groupCoordinatorConfig.offsetTopicCompressionType)
assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel) assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)
assertEquals(false, config.tokenAuthEnabled)
assertEquals(7 * 24 * 60L * 60L * 1000L, config.delegationTokenMaxLifeMs)
assertEquals(24 * 60L * 60L * 1000L, config.delegationTokenExpiryTimeMs)
assertEquals(1 * 60L * 1000L * 60, config.delegationTokenExpiryCheckIntervalMs)
defaults.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, "1234567890")
val config1 = KafkaConfig.fromProps(defaults)
assertEquals(true, config1.tokenAuthEnabled)
} }
@Test @Test

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.server.config.DelegationTokenManagerConfigs;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
public class DelegationTokenManager {
private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA512";
public static final long ERROR_TIMESTAMP = -1;
private final DelegationTokenCache tokenCache;
private final SecretKey secretKey;
public DelegationTokenManager(DelegationTokenManagerConfigs config, DelegationTokenCache tokenCache) {
this.tokenCache = tokenCache;
byte[] keyBytes = config.tokenAuthEnabled() ? config.delegationTokenSecretKey().value().getBytes(StandardCharsets.UTF_8) : null;
if (keyBytes == null || keyBytes.length == 0) {
this.secretKey = null;
} else {
this.secretKey = createSecretKey(keyBytes);
}
}
private static SecretKey createSecretKey(byte[] keyBytes) {
return new SecretKeySpec(keyBytes, DEFAULT_HMAC_ALGORITHM);
}
public static byte[] createHmac(String tokenId, SecretKey secretKey) {
try {
Mac mac = Mac.getInstance(DEFAULT_HMAC_ALGORITHM);
mac.init(secretKey);
return mac.doFinal(tokenId.getBytes(StandardCharsets.UTF_8));
} catch (InvalidKeyException e) {
throw new IllegalArgumentException("Invalid key to HMAC computation", e);
} catch (Exception e) {
throw new RuntimeException("Error while creating HMAC", e);
}
}
private Map<String, ScramCredential> prepareScramCredentials(String hmacString) throws NoSuchAlgorithmException {
Map<String, ScramCredential> scramCredentialMap = new HashMap<>();
for (ScramMechanism mechanism : ScramMechanism.values()) {
ScramFormatter formatter = new ScramFormatter(mechanism);
scramCredentialMap.put(mechanism.mechanismName(), formatter.generateCredential(hmacString, mechanism.minIterations()));
}
return scramCredentialMap;
}
public void updateToken(DelegationToken token) throws NoSuchAlgorithmException {
String hmacString = token.hmacAsBase64String();
Map<String, ScramCredential> scramCredentialMap = prepareScramCredentials(hmacString);
tokenCache.updateCache(token, scramCredentialMap);
}
public DelegationToken getDelegationToken(TokenInformation tokenInfo) {
byte[] hmac = createHmac(tokenInfo.tokenId(), secretKey);
return new DelegationToken(tokenInfo, hmac);
}
public void removeToken(String tokenId) {
tokenCache.removeCache(tokenId);
}
public List<DelegationToken> getTokens(Predicate<TokenInformation> filterToken) {
return tokenCache.tokens().stream()
.filter(filterToken)
.map(this::getDelegationToken)
.toList();
}
public static boolean filterToken(
KafkaPrincipal requesterPrincipal,
Optional<List<KafkaPrincipal>> owners,
TokenInformation token,
Function<String, Boolean> authorizeToken,
Function<KafkaPrincipal, Boolean> authorizeRequester
) {
if (owners.isPresent() && owners.get().stream().noneMatch(token::ownerOrRenewer)) {
//exclude tokens which are not requested
return false;
} else if (token.ownerOrRenewer(requesterPrincipal)) {
//Owners and the renewers can describe their own tokens
return true;
} else {
// Check permission for non-owned tokens
return authorizeToken.apply(token.tokenId()) || authorizeRequester.apply(token.owner());
}
}
}

View File

@ -16,7 +16,9 @@
*/ */
package org.apache.kafka.server.config; package org.apache.kafka.server.config;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.types.Password;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
@ -49,4 +51,38 @@ public class DelegationTokenManagerConfigs {
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, atLeast(1), MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_DOC) .define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, atLeast(1), MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DEFAULT, atLeast(1), MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DOC) .define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DEFAULT, atLeast(1), MEDIUM, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_DOC); .define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG, LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_DOC);
private final Password delegationTokenSecretKey;
private final boolean tokenAuthEnabled;
private final long delegationTokenMaxLifeMs;
private final long delegationTokenExpiryTimeMs;
private final long delegationTokenExpiryCheckIntervalMs;
public DelegationTokenManagerConfigs(AbstractConfig config) {
this.delegationTokenSecretKey = config.getPassword(DELEGATION_TOKEN_SECRET_KEY_CONFIG);
this.tokenAuthEnabled = delegationTokenSecretKey != null && !delegationTokenSecretKey.value().isEmpty();
this.delegationTokenMaxLifeMs = config.getLong(DELEGATION_TOKEN_MAX_LIFETIME_CONFIG);
this.delegationTokenExpiryTimeMs = config.getLong(DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG);
this.delegationTokenExpiryCheckIntervalMs = config.getLong(DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG);
}
public Password delegationTokenSecretKey() {
return delegationTokenSecretKey;
}
public boolean tokenAuthEnabled() {
return tokenAuthEnabled;
}
public long delegationTokenMaxLifeMs() {
return delegationTokenMaxLifeMs;
}
public long delegationTokenExpiryTimeMs() {
return delegationTokenExpiryTimeMs;
}
public long delegationTokenExpiryCheckIntervalMs() {
return delegationTokenExpiryCheckIntervalMs;
}
} }

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.config;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.types.Password;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Map;
import static org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG;
import static org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG;
import static org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG;
import static org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class DelegationTokenManagerConfigsTest {
@Test
void testDefaults() {
DelegationTokenManagerConfigs config = new DelegationTokenManagerConfigs(new AbstractConfig(DelegationTokenManagerConfigs.CONFIG_DEF, Map.of()));
assertNull(config.delegationTokenSecretKey());
assertFalse(config.tokenAuthEnabled());
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, config.delegationTokenMaxLifeMs());
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DEFAULT, config.delegationTokenExpiryTimeMs());
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_DEFAULT, config.delegationTokenExpiryCheckIntervalMs());
}
@Test
void testOverride() {
DelegationTokenManagerConfigs config = new DelegationTokenManagerConfigs(
new AbstractConfig(DelegationTokenManagerConfigs.CONFIG_DEF,
Map.of(
DELEGATION_TOKEN_SECRET_KEY_CONFIG, "test",
DELEGATION_TOKEN_MAX_LIFETIME_CONFIG, "500",
DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG, "200",
DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG, "100"
)
)
);
assertEquals(new Password("test"), config.delegationTokenSecretKey());
assertTrue(config.tokenAuthEnabled());
assertEquals(500L, config.delegationTokenMaxLifeMs());
assertEquals(200L, config.delegationTokenExpiryTimeMs());
assertEquals(100L, config.delegationTokenExpiryCheckIntervalMs());
}
@ParameterizedTest
@ValueSource(strings = {
DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG,
DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG
})
void testInvalidProperty(String field) {
assertThrows(Exception.class, () -> new DelegationTokenManagerConfigs(
new AbstractConfig(DelegationTokenManagerConfigs.CONFIG_DEF, Map.of(field, "not_a_number"))));
}
}