KAFKA-18590 Cleanup DelegationTokenManager (#18618)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2025-01-24 20:12:03 +08:00 committed by GitHub
parent fa2df3bca7
commit 5d81fe20c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 12 additions and 155 deletions

View File

@ -355,7 +355,6 @@ class BrokerServer(
/* start token manager */
tokenManager = new DelegationTokenManager(config, tokenCache, time)
tokenManager.startup()
/* initializing the groupConfigManager */
groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig))
@ -787,9 +786,6 @@ class BrokerServer(
if (shareCoordinator.isDefined)
CoreUtils.swallow(shareCoordinator.get.shutdown(), this)
if (tokenManager != null)
CoreUtils.swallow(tokenManager.shutdown(), this)
if (assignmentsManager != null)
CoreUtils.swallow(assignmentsManager.close(), this)

View File

@ -17,14 +17,11 @@
package kafka.server
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.security.InvalidKeyException
import javax.crypto.spec.SecretKeySpec
import javax.crypto.{Mac, SecretKey}
import kafka.utils.Logging
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.{ScramFormatter, ScramMechanism}
import org.apache.kafka.common.security.scram.ScramCredential
@ -32,24 +29,15 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.Time
import java.util
import java.util.stream.Collectors
import scala.jdk.CollectionConverters._
import scala.collection.mutable
object DelegationTokenManager {
private val DefaultHmacAlgorithm = "HmacSHA512"
val CurrentVersion = 3
val ErrorTimestamp = -1
/**
*
* @param tokenId
* @param secretKey
* @return
*/
def createHmac(tokenId: String, secretKey: String) : Array[Byte] = {
createHmac(tokenId, createSecretKey(secretKey.getBytes(StandardCharsets.UTF_8)))
}
/**
* Convert the byte[] to a secret key
* @param keybytes the byte[] to create the secret key from
@ -102,14 +90,8 @@ class DelegationTokenManager(val config: KafkaConfig,
val time: Time) extends Logging {
this.logIdent = s"[Token Manager on Node ${config.brokerId}]: "
protected val lock = new Object()
import DelegationTokenManager._
type CreateResponseCallback = CreateTokenResult => Unit
type RenewResponseCallback = (Errors, Long) => Unit
type ExpireResponseCallback = (Errors, Long) => Unit
val secretKey: SecretKey = {
val keyBytes = if (config.tokenAuthEnabled) config.delegationTokenSecretKey.value.getBytes(StandardCharsets.UTF_8) else null
if (keyBytes == null || keyBytes.isEmpty) null
@ -117,26 +99,6 @@ class DelegationTokenManager(val config: KafkaConfig,
createSecretKey(keyBytes)
}
val tokenMaxLifetime: Long = config.delegationTokenMaxLifeMs
val defaultTokenRenewTime: Long = config.delegationTokenExpiryTimeMs
def startup(): Unit = {
// Nothing to do. Overridden for Zk case
}
def shutdown(): Unit = {
// Nothing to do. Overridden for Zk case
}
/**
*
* @param token
*/
protected def updateCache(token: DelegationToken): Unit = {
val hmacString = token.hmacAsBase64String
val scramCredentialMap = prepareScramCredentials(hmacString)
tokenCache.updateCache(token, scramCredentialMap.asJava)
}
/**
* @param hmacString
*/
@ -157,38 +119,9 @@ class DelegationTokenManager(val config: KafkaConfig,
* @param token
*/
def updateToken(token: DelegationToken): Unit = {
updateCache(token)
}
/**
*
* @param owner
* @param renewers
* @param maxLifeTimeMs
* @param responseCallback
*/
def createToken(owner: KafkaPrincipal,
tokenRequester: KafkaPrincipal,
renewers: List[KafkaPrincipal],
maxLifeTimeMs: Long,
responseCallback: CreateResponseCallback): Unit = {
// Must be forwarded to KRaft Controller or handled in DelegationTokenManagerZk
throw new IllegalStateException("API createToken was not forwarded to a handler.")
}
/**
*
* @param principal
* @param hmac
* @param renewLifeTimeMs
* @param renewCallback
*/
def renewToken(principal: KafkaPrincipal,
hmac: ByteBuffer,
renewLifeTimeMs: Long,
renewCallback: RenewResponseCallback): Unit = {
// Must be forwarded to KRaft Controller or handled in DelegationTokenManagerZk
throw new IllegalStateException("API renewToken was not forwarded to a handler.")
val hmacString = token.hmacAsBase64String
val scramCredentialMap = prepareScramCredentials(hmacString)
tokenCache.updateCache(token, scramCredentialMap.asJava)
}
def getDelegationToken(tokenInfo: TokenInformation): DelegationToken = {
@ -196,87 +129,15 @@ class DelegationTokenManager(val config: KafkaConfig,
new DelegationToken(tokenInfo, hmac)
}
/**
*
* @param principal
* @param hmac
* @param expireLifeTimeMs
* @param expireResponseCallback
*/
def expireToken(principal: KafkaPrincipal,
hmac: ByteBuffer,
expireLifeTimeMs: Long,
expireResponseCallback: ExpireResponseCallback): Unit = {
// Must be forwarded to KRaft Controller or handled in DelegationTokenManagerZk
throw new IllegalStateException("API expireToken was not forwarded to a handler.")
}
/**
*
* @param tokenId
*/
def removeToken(tokenId: String): Unit = {
removeCache(tokenId)
}
/**
*
* @param tokenId
*/
protected def removeCache(tokenId: String): Unit = {
tokenCache.removeCache(tokenId)
}
/**
*
* @return
*/
def expireTokens(): Unit = {
lock.synchronized {
for (tokenInfo <- getAllTokenInformation) {
val now = time.milliseconds
if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
info(s"Delegation token expired for token: ${tokenInfo.tokenId} for owner: ${tokenInfo.owner}")
removeToken(tokenInfo.tokenId)
}
}
}
}
def getAllTokenInformation: List[TokenInformation] = tokenCache.tokens.asScala.toList
def getTokens(filterToken: TokenInformation => Boolean): List[DelegationToken] = {
getAllTokenInformation.filter(filterToken).map(token => getDelegationToken(token))
}
}
case class CreateTokenResult(owner: KafkaPrincipal,
tokenRequester: KafkaPrincipal,
issueTimestamp: Long,
expiryTimestamp: Long,
maxTimestamp: Long,
tokenId: String,
hmac: Array[Byte],
error: Errors) {
override def equals(other: Any): Boolean = {
other match {
case that: CreateTokenResult =>
error.equals(that.error) &&
owner.equals(that.owner) &&
tokenRequester.equals(that.tokenRequester) &&
tokenId.equals(that.tokenId) &&
issueTimestamp.equals(that.issueTimestamp) &&
expiryTimestamp.equals(that.expiryTimestamp) &&
maxTimestamp.equals(that.maxTimestamp) &&
(hmac sameElements that.hmac)
case _ => false
}
}
override def hashCode(): Int = {
val fields = Seq(owner, tokenRequester, issueTimestamp, expiryTimestamp, maxTimestamp, tokenId, hmac, error)
fields.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
def getTokens(filterToken: TokenInformation => Boolean): util.List[DelegationToken] = {
tokenCache.tokens.stream().filter(token => filterToken(token)).map(getDelegationToken).collect(Collectors.toList[DelegationToken])
}
}

View File

@ -2247,22 +2247,22 @@ class KafkaApis(val requestChannel: RequestChannel,
val describeTokenRequest = request.body[DescribeDelegationTokenRequest]
// the callback for sending a describe token response
def sendResponseCallback(error: Errors, tokenDetails: List[DelegationToken]): Unit = {
def sendResponseCallback(error: Errors, tokenDetails: util.List[DelegationToken]): Unit = {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeDelegationTokenResponse(request.context.requestVersion(), requestThrottleMs, error, tokenDetails.asJava))
new DescribeDelegationTokenResponse(request.context.requestVersion(), requestThrottleMs, error, tokenDetails))
trace("Sending describe token response for correlation id %d to client %s."
.format(request.header.correlationId, request.header.clientId))
}
if (!allowTokenRequests(request))
sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, List.empty)
sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, Collections.emptyList)
else if (!config.tokenAuthEnabled)
sendResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, List.empty)
sendResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, Collections.emptyList)
else {
val requestPrincipal = request.context.principal
if (describeTokenRequest.ownersListEmpty()) {
sendResponseCallback(Errors.NONE, List())
sendResponseCallback(Errors.NONE, Collections.emptyList)
}
else {
val owners = if (describeTokenRequest.data.owners == null)