mirror of https://github.com/apache/kafka.git
KAFKA-17315 Fix the behavior of delegation tokens that expire immediately upon creation in KRaft mode (#16858)
In kraft mode, expiring delegation token (`expiryTimePeriodMs` < 0) has following different behavior to zk mode. 1. `ExpiryTimestampMs` is set to "expiryTimePeriodMs" [0] rather than "now" [1] 2. it throws exception directly if the token is expired already [2]. By contrast, zk mode does not. [3] [0]49fc14f611/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java (L316)
[1]49fc14f611/core/src/main/scala/kafka/server/DelegationTokenManagerZk.scala (L292)
[2]49fc14f611/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java (L305)
[3]49fc14f611/core/src/main/scala/kafka/server/DelegationTokenManagerZk.scala (L293)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
e750f44cf8
commit
4a485ddb71
|
@ -22,13 +22,14 @@ import org.apache.kafka.common.acl._
|
||||||
import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, IDEMPOTENT_WRITE}
|
import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, IDEMPOTENT_WRITE}
|
||||||
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
|
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
|
||||||
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, TopicConfig}
|
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, TopicConfig}
|
||||||
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException}
|
import org.apache.kafka.common.errors.{ClusterAuthorizationException, DelegationTokenExpiredException, DelegationTokenNotFoundException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException}
|
||||||
import org.apache.kafka.common.resource.PatternType.LITERAL
|
import org.apache.kafka.common.resource.PatternType.LITERAL
|
||||||
import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
|
import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
|
||||||
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
|
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
|
||||||
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
|
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
|
||||||
|
import org.apache.kafka.common.security.token.delegation.DelegationToken
|
||||||
import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING}
|
import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING}
|
||||||
import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs}
|
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConfigs, ZkConfigs}
|
||||||
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
|
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
|
||||||
import org.apache.kafka.storage.internals.log.LogConfig
|
import org.apache.kafka.storage.internals.log.LogConfig
|
||||||
import org.junit.jupiter.api.Assertions._
|
import org.junit.jupiter.api.Assertions._
|
||||||
|
@ -67,6 +68,10 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
||||||
this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString)
|
this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Enable delegationTokenControlManager
|
||||||
|
serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, "123")
|
||||||
|
serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG, "5000")
|
||||||
|
|
||||||
setUpSasl()
|
setUpSasl()
|
||||||
super.setUp(testInfo)
|
super.setUp(testInfo)
|
||||||
setInitialAcls()
|
setInitialAcls()
|
||||||
|
@ -520,6 +525,50 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(strings = Array("zk", "kraft"))
|
||||||
|
def testExpireDelegationToken(quorum: String): Unit = {
|
||||||
|
client = createAdminClient
|
||||||
|
val createDelegationTokenOptions = new CreateDelegationTokenOptions()
|
||||||
|
|
||||||
|
// Test expiration for non-exists token
|
||||||
|
TestUtils.assertFutureExceptionTypeEquals(
|
||||||
|
client.expireDelegationToken("".getBytes()).expiryTimestamp(),
|
||||||
|
classOf[DelegationTokenNotFoundException]
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test expiring the token immediately
|
||||||
|
val token1 = client.createDelegationToken(createDelegationTokenOptions).delegationToken().get()
|
||||||
|
TestUtils.retry(maxWaitMs = 1000) { assertTrue(expireTokenOrFailWithAssert(token1, -1) < System.currentTimeMillis()) }
|
||||||
|
|
||||||
|
// Test expiring the expired token
|
||||||
|
val token2 = client.createDelegationToken(createDelegationTokenOptions.maxlifeTimeMs(1000)).delegationToken().get()
|
||||||
|
// Ensure current time > maxLifeTimeMs of token
|
||||||
|
Thread.sleep(1000)
|
||||||
|
TestUtils.assertFutureExceptionTypeEquals(
|
||||||
|
client.expireDelegationToken(token2.hmac(), new ExpireDelegationTokenOptions().expiryTimePeriodMs(1)).expiryTimestamp(),
|
||||||
|
classOf[DelegationTokenExpiredException]
|
||||||
|
)
|
||||||
|
|
||||||
|
// Ensure expiring the expired token with negative expiryTimePeriodMs will not throw exception
|
||||||
|
assertDoesNotThrow(() => expireTokenOrFailWithAssert(token2, -1))
|
||||||
|
|
||||||
|
// Test shortening the expiryTimestamp
|
||||||
|
val token3 = client.createDelegationToken(createDelegationTokenOptions).delegationToken().get()
|
||||||
|
TestUtils.retry(1000) { assertTrue(expireTokenOrFailWithAssert(token3, 200) < token3.tokenInfo().expiryTimestamp()) }
|
||||||
|
}
|
||||||
|
|
||||||
|
private def expireTokenOrFailWithAssert(token: DelegationToken, expiryTimePeriodMs: Long): Long = {
|
||||||
|
try {
|
||||||
|
client.expireDelegationToken(token.hmac(), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs))
|
||||||
|
.expiryTimestamp().get()
|
||||||
|
} catch {
|
||||||
|
// If metadata is not synced yet, the response will contain an errorCode, causing an exception to be thrown.
|
||||||
|
// This wrapper is designed to work with TestUtils.retry
|
||||||
|
case _: ExecutionException => throw new AssertionError("Metadata not sync yet.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private def describeConfigs(topic: String): Iterable[ConfigEntry] = {
|
private def describeConfigs(topic: String): Iterable[ConfigEntry] = {
|
||||||
val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
|
val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
|
||||||
var configEntries: Iterable[ConfigEntry] = null
|
var configEntries: Iterable[ConfigEntry] = null
|
||||||
|
|
|
@ -302,10 +302,6 @@ public class DelegationTokenControlManager {
|
||||||
return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code()));
|
return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code()));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (myTokenInformation.maxTimestamp() < now || myTokenInformation.expiryTimestamp() < now) {
|
|
||||||
return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!allowedToRenew(myTokenInformation, context.principal())) {
|
if (!allowedToRenew(myTokenInformation, context.principal())) {
|
||||||
return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code()));
|
return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code()));
|
||||||
}
|
}
|
||||||
|
@ -313,9 +309,11 @@ public class DelegationTokenControlManager {
|
||||||
if (requestData.expiryTimePeriodMs() < 0) { // expire immediately
|
if (requestData.expiryTimePeriodMs() < 0) { // expire immediately
|
||||||
responseData
|
responseData
|
||||||
.setErrorCode(NONE.code())
|
.setErrorCode(NONE.code())
|
||||||
.setExpiryTimestampMs(requestData.expiryTimePeriodMs());
|
.setExpiryTimestampMs(now);
|
||||||
records.add(new ApiMessageAndVersion(new RemoveDelegationTokenRecord().
|
records.add(new ApiMessageAndVersion(new RemoveDelegationTokenRecord().
|
||||||
setTokenId(myTokenInformation.tokenId()), (short) 0));
|
setTokenId(myTokenInformation.tokenId()), (short) 0));
|
||||||
|
} else if (myTokenInformation.maxTimestamp() < now || myTokenInformation.expiryTimestamp() < now) {
|
||||||
|
responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code());
|
||||||
} else {
|
} else {
|
||||||
long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(),
|
long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(),
|
||||||
now + requestData.expiryTimePeriodMs());
|
now + requestData.expiryTimePeriodMs());
|
||||||
|
|
Loading…
Reference in New Issue