diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala index 24947cde26c..f08db242944 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala @@ -22,13 +22,14 @@ import org.apache.kafka.common.acl._ import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE, IDEMPOTENT_WRITE} import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, TopicConfig} -import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException} +import org.apache.kafka.common.errors.{ClusterAuthorizationException, DelegationTokenExpiredException, DelegationTokenNotFoundException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException} import org.apache.kafka.common.resource.PatternType.LITERAL import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC} import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.security.token.delegation.DelegationToken import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING} -import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs} +import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConfigs, ZkConfigs} import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.storage.internals.log.LogConfig import org.junit.jupiter.api.Assertions._ @@ -67,6 +68,10 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString) } + // Enable delegationTokenControlManager + serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG, "123") + serverConfig.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG, "5000") + setUpSasl() super.setUp(testInfo) setInitialAcls() @@ -520,6 +525,50 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu } } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testExpireDelegationToken(quorum: String): Unit = { + client = createAdminClient + val createDelegationTokenOptions = new CreateDelegationTokenOptions() + + // Test expiration for non-exists token + TestUtils.assertFutureExceptionTypeEquals( + client.expireDelegationToken("".getBytes()).expiryTimestamp(), + classOf[DelegationTokenNotFoundException] + ) + + // Test expiring the token immediately + val token1 = client.createDelegationToken(createDelegationTokenOptions).delegationToken().get() + TestUtils.retry(maxWaitMs = 1000) { assertTrue(expireTokenOrFailWithAssert(token1, -1) < System.currentTimeMillis()) } + + // Test expiring the expired token + val token2 = client.createDelegationToken(createDelegationTokenOptions.maxlifeTimeMs(1000)).delegationToken().get() + // Ensure current time > maxLifeTimeMs of token + Thread.sleep(1000) + TestUtils.assertFutureExceptionTypeEquals( + client.expireDelegationToken(token2.hmac(), new ExpireDelegationTokenOptions().expiryTimePeriodMs(1)).expiryTimestamp(), + classOf[DelegationTokenExpiredException] + ) + + // Ensure expiring the expired token with negative expiryTimePeriodMs will not throw exception + assertDoesNotThrow(() => expireTokenOrFailWithAssert(token2, -1)) + + // Test shortening the expiryTimestamp + val token3 = client.createDelegationToken(createDelegationTokenOptions).delegationToken().get() + TestUtils.retry(1000) { assertTrue(expireTokenOrFailWithAssert(token3, 200) < token3.tokenInfo().expiryTimestamp()) } + } + + private def expireTokenOrFailWithAssert(token: DelegationToken, expiryTimePeriodMs: Long): Long = { + try { + client.expireDelegationToken(token.hmac(), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs)) + .expiryTimestamp().get() + } catch { + // If metadata is not synced yet, the response will contain an errorCode, causing an exception to be thrown. + // This wrapper is designed to work with TestUtils.retry + case _: ExecutionException => throw new AssertionError("Metadata not sync yet.") + } + } + private def describeConfigs(topic: String): Iterable[ConfigEntry] = { val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic) var configEntries: Iterable[ConfigEntry] = null diff --git a/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java index fa50570a48f..04a69a1e36a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java @@ -302,10 +302,6 @@ public class DelegationTokenControlManager { return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_NOT_FOUND.code())); } - if (myTokenInformation.maxTimestamp() < now || myTokenInformation.expiryTimestamp() < now) { - return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code())); - } - if (!allowedToRenew(myTokenInformation, context.principal())) { return ControllerResult.atomicOf(records, responseData.setErrorCode(DELEGATION_TOKEN_OWNER_MISMATCH.code())); } @@ -313,9 +309,11 @@ public class DelegationTokenControlManager { if (requestData.expiryTimePeriodMs() < 0) { // expire immediately responseData .setErrorCode(NONE.code()) - .setExpiryTimestampMs(requestData.expiryTimePeriodMs()); + .setExpiryTimestampMs(now); records.add(new ApiMessageAndVersion(new RemoveDelegationTokenRecord(). setTokenId(myTokenInformation.tokenId()), (short) 0)); + } else if (myTokenInformation.maxTimestamp() < now || myTokenInformation.expiryTimestamp() < now) { + responseData.setErrorCode(DELEGATION_TOKEN_EXPIRED.code()); } else { long expiryTimestamp = Math.min(myTokenInformation.maxTimestamp(), now + requestData.expiryTimePeriodMs());