KAFKA-18711 Move DelegationTokenPublisher to metadata module (#20475)

Basically, one of the refactor tasks. In this PR, I have moved
`DelegationTokenPublisher` to the metadata module. Similar to the
`ScramPublisher` migration (commit feee50f476), I have moved
`DelegationTokenManager` to the server-common module, as it would
otherwise create a circular dependency. Moreover, I have made multiple
changes throughout the codebase to reference `DelegationTokenManager`
from server-common instead of the server module.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Maros Orsak 2025-09-24 14:19:08 +02:00 committed by GitHub
parent 8036e49a6e
commit 486b991f22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 97 additions and 100 deletions

View File

@ -33,6 +33,7 @@
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="net.jqwik.api" />
<allow pkg="javax.crypto" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
@ -49,6 +50,9 @@
<!-- persistent collection factories/non-library-specific wrappers -->
<allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
<!-- allow config classes for server package -->
<allow pkg="org.apache.kafka.server.config" />
<subpackage name="queue">
<allow pkg="org.apache.kafka.test" />
</subpackage>

View File

@ -36,9 +36,9 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.coordinator.share.ShareCoordinator;
import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.security.DelegationTokenManager;
import org.apache.kafka.server.ApiVersionManager;
import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.DelegationTokenManager;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

View File

@ -42,8 +42,8 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
@ -54,7 +54,7 @@ import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, DelegationTokenManager, ProcessRole}
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, ProcessRole}
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@ -502,7 +502,7 @@ class BrokerServer(
"broker",
credentialProvider),
new DelegationTokenPublisher(
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
tokenManager),

View File

@ -55,7 +55,8 @@ import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.raft.RaftManager
import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager, ProcessRole}
import org.apache.kafka.security.DelegationTokenManager
import org.apache.kafka.server.{ApiVersionManager, ProcessRole}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
import org.apache.kafka.server.quota.ControllerMutationQuota

View File

@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers
import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@ -38,14 +38,15 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager}
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, NodeToControllerChannelManager}
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
import org.apache.kafka.server.config.ConfigType
import org.apache.kafka.server.config.DelegationTokenManagerConfigs
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@ -360,7 +361,7 @@ class ControllerServer(
// We need a tokenManager for the Publisher
// The tokenCache in the tokenManager is the same used in DelegationTokenControlManager
metadataPublishers.add(new DelegationTokenPublisher(
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
new DelegationTokenManager(delegationTokenManagerConfigs, tokenCache)

View File

@ -60,7 +60,8 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.{Group, GroupConfig, GroupConfigManager, GroupCoordinator}
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, DelegationTokenManager, ProcessRole}
import org.apache.kafka.security.DelegationTokenManager
import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, ProcessRole}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, RequestLocal, ShareVersion, StreamsVersion, TransactionVersion}
import org.apache.kafka.server.config.DelegationTokenManagerConfigs

View File

@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
import org.apache.kafka.server.fault.FaultHandler
@ -227,7 +227,7 @@ class BrokerMetadataPublisher(
scramPublisher.onMetadataUpdate(delta, newImage, manifest)
// Apply DelegationToken delta.
delegationTokenPublisher.onMetadataUpdate(delta, newImage)
delegationTokenPublisher.onMetadataUpdate(delta, newImage, manifest)
// Apply ACL delta.
aclPublisher.onMetadataUpdate(delta, newImage, manifest)

View File

@ -1,83 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.metadata
import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.DelegationTokenManager
import org.apache.kafka.server.fault.FaultHandler
class DelegationTokenPublisher(
conf: KafkaConfig,
faultHandler: FaultHandler,
nodeType: String,
tokenManager: DelegationTokenManager,
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
var _firstPublish = true
override def name(): String = s"DelegationTokenPublisher $nodeType id=${conf.nodeId}"
override def onMetadataUpdate(
delta: MetadataDelta,
newImage: MetadataImage,
manifest: LoaderManifest
): Unit = {
onMetadataUpdate(delta, newImage)
}
def onMetadataUpdate(
delta: MetadataDelta,
newImage: MetadataImage,
): Unit = {
val deltaName = if (_firstPublish) {
s"initial MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
} else {
s"update MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
}
try {
if (_firstPublish) {
// Initialize the tokenCache with the Image
Option(newImage.delegationTokens()).foreach { delegationTokenImage =>
delegationTokenImage.tokens().forEach { (_, delegationTokenData) =>
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.tokenInformation()))
}
}
_firstPublish = false
}
// Apply changes to DelegationTokens.
Option(delta.delegationTokenDelta()).foreach { delegationTokenDelta =>
delegationTokenDelta.changes().forEach {
case (tokenId, delegationTokenData) =>
if (delegationTokenData.isPresent) {
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.get().tokenInformation()))
} else {
tokenManager.removeToken(tokenId)
}
}
}
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
s"publishing DelegationToken changes from $deltaName", t)
}
}
}

View File

@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
import org.apache.kafka.image.loader.LogDeltaManifest
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.publisher;
import org.apache.kafka.image.DelegationTokenImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.security.DelegationTokenManager;
import org.apache.kafka.server.fault.FaultHandler;
public class DelegationTokenPublisher implements MetadataPublisher {
private final int nodeId;
private final FaultHandler faultHandler;
private final String nodeType;
private final DelegationTokenManager tokenManager;
private boolean firstPublish = true;
public DelegationTokenPublisher(int nodeId, FaultHandler faultHandler, String nodeType, DelegationTokenManager tokenManager) {
this.nodeId = nodeId;
this.faultHandler = faultHandler;
this.nodeType = nodeType;
this.tokenManager = tokenManager;
}
@Override
public final String name() {
return "DelegationTokenPublisher " + nodeType + " id=" + nodeId;
}
@Override
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
var first = firstPublish;
try {
if (firstPublish) {
// Initialize the tokenCache with the Image
DelegationTokenImage delegationTokenImage = newImage.delegationTokens();
for (var token : delegationTokenImage.tokens().entrySet()) {
tokenManager.updateToken(tokenManager.getDelegationToken(token.getValue().tokenInformation()));
}
firstPublish = false;
}
// Apply changes to DelegationTokens.
for (var token : delta.getOrCreateDelegationTokenDelta().changes().entrySet()) {
var tokenId = token.getKey();
var delegationTokenData = token.getValue();
if (delegationTokenData.isPresent())
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.get().tokenInformation()));
else
tokenManager.removeToken(tokenId);
}
} catch (Throwable t) {
var msg = String.format("Uncaught exception while publishing DelegationToken changes from %s MetadataDelta up to %s",
first ? "initial" : "update", newImage.highestOffsetAndEpoch().offset());
faultHandler.handleFault(msg, t);
}
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
package org.apache.kafka.security;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.scram.ScramCredential;