KAFKA-18704: Move ScramPublisher to metadata module

Signed-off-by: see-quick <maros.orsak159@gmail.com>
This commit is contained in:
see-quick 2025-09-03 15:39:55 +02:00
parent d226b43597
commit feee50f476
11 changed files with 89 additions and 85 deletions

View File

@ -44,7 +44,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.apache.kafka.network.{ConnectionQuotaEntity, ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.metrics.KafkaMetricsGroup

View File

@ -42,8 +42,8 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
@ -498,7 +498,7 @@ class BrokerServer(
quotaManagers,
),
new ScramPublisher(
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
credentialProvider),

View File

@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers
import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@ -38,9 +38,9 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
@ -350,7 +350,7 @@ class ControllerServer(
// Set up the SCRAM publisher.
metadataPublishers.add(new ScramPublisher(
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
credentialProvider

View File

@ -28,7 +28,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.{BrokerState, MetadataCache}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.NodeToControllerChannelManager
import org.apache.kafka.server.log.remote.storage.RemoteLogManager

View File

@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
import org.apache.kafka.server.fault.FaultHandler

View File

@ -1,71 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.metadata
import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.fault.FaultHandler
class ScramPublisher(
conf: KafkaConfig,
faultHandler: FaultHandler,
nodeType: String,
credentialProvider: CredentialProvider,
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
override def name(): String = s"ScramPublisher $nodeType id=${conf.nodeId}"
override def onMetadataUpdate(
delta: MetadataDelta,
newImage: MetadataImage,
manifest: LoaderManifest
): Unit = {
onMetadataUpdate(delta, newImage)
}
def onMetadataUpdate(
delta: MetadataDelta,
newImage: MetadataImage,
): Unit = {
val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
try {
// Apply changes to SCRAM credentials.
Option(delta.scramDelta()).foreach { scramDelta =>
scramDelta.changes().forEach {
case (mechanism, userChanges) =>
userChanges.forEach {
case (userName, change) =>
if (change.isPresent) {
credentialProvider.updateCredential(mechanism, userName, change.get().toCredential)
} else {
credentialProvider.removeCredentials(mechanism, userName)
}
}
}
}
} catch {
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
s"publishing SCRAM changes from $deltaName", t)
}
}
}

View File

@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.SimpleApiVersionManager
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde

View File

@ -37,7 +37,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils._
import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.{ApiVersionManager, SimpleApiVersionManager}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.config.QuotaConfig

View File

@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
import org.apache.kafka.image.loader.LogDeltaManifest
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.publisher;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.server.common.CredentialProvider;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;
import java.util.Optional;
public class ScramPublisher implements MetadataPublisher {
private final Logger log;
private final int nodeId;
private final FaultHandler faultHandler;
private final String nodeType;
private final CredentialProvider credentialProvider;
public ScramPublisher(int nodeId, FaultHandler faultHandler, String nodeType, CredentialProvider credentialProvider) {
this.nodeId = nodeId;
this.faultHandler = faultHandler;
this.nodeType = nodeType;
this.credentialProvider = credentialProvider;
this.log = new LogContext(name()).logger(ScramPublisher.class);
}
@Override
public final String name() {
return "ScramPublisher " + nodeType + " id=" + nodeId;
}
@Override
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
onMetadataUpdate(delta, newImage);
}
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
try {
// Apply changes to SCRAM credentials.
Optional.ofNullable(delta.scramDelta()).ifPresent(scramDelta -> {
scramDelta.changes().forEach((mechanism, userChanges) -> {
userChanges.forEach((userName, change) -> {
if (change.isPresent()) {
credentialProvider.updateCredential(mechanism, userName, change.get().toCredential());
} else {
credentialProvider.removeCredentials(mechanism, userName);
}
});
});
});
} catch (Throwable t) {
faultHandler.handleFault("Uncaught exception while publishing SCRAM changes from " + deltaName, t);
}
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.security;
package org.apache.kafka.server.common;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.security.authenticator.CredentialCache;