diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml index 293801bd75f..d5b787ce24f 100644 --- a/checkstyle/import-control-metadata.xml +++ b/checkstyle/import-control-metadata.xml @@ -161,6 +161,7 @@ + diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml index 8d85dffa341..21b13ed91d2 100644 --- a/checkstyle/import-control-server-common.xml +++ b/checkstyle/import-control-server-common.xml @@ -60,6 +60,7 @@ + diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 9af3492286e..dccd07b83c6 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec import org.apache.kafka.coordinator.transaction.ProducerIdManager import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher} import org.apache.kafka.metadata.{BrokerState, ListenerInfo} -import org.apache.kafka.metadata.publisher.AclPublisher +import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher} import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition} @@ -497,7 +497,7 @@ class BrokerServer( quotaManagers, ), new ScramPublisher( - config, + config.nodeId, sharedServer.metadataPublishingFaultHandler, "broker", credentialProvider), diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index e8427fa7e53..bfcc9ed7c0b 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager import kafka.server.QuotaFactory.QuotaManagers import scala.collection.immutable -import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher} +import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher} import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.internals.Plugin import org.apache.kafka.common.message.ApiMessageType.ListenerType @@ -38,7 +38,7 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo} import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer import org.apache.kafka.metadata.bootstrap.BootstrapMetadata -import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher} +import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher, ScramPublisher} import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager} @@ -350,7 +350,7 @@ class ControllerServer( // Set up the SCRAM publisher. metadataPublishers.add(new ScramPublisher( - config, + config.nodeId, sharedServer.metadataPublishingFaultHandler, "controller", credentialProvider diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 7876163f9ea..80a037ce4bd 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.publisher.MetadataPublisher import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta} -import org.apache.kafka.metadata.publisher.AclPublisher +import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher} import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion} import org.apache.kafka.server.fault.FaultHandler diff --git a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala deleted file mode 100644 index 818e01fa5f8..00000000000 --- a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server.metadata - -import kafka.server.KafkaConfig -import kafka.utils.Logging -import org.apache.kafka.image.loader.LoaderManifest -import org.apache.kafka.image.{MetadataDelta, MetadataImage} -import org.apache.kafka.security.CredentialProvider -import org.apache.kafka.server.fault.FaultHandler - - -class ScramPublisher( - conf: KafkaConfig, - faultHandler: FaultHandler, - nodeType: String, - credentialProvider: CredentialProvider, -) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { - logIdent = s"[${name()}] " - - override def name(): String = s"ScramPublisher $nodeType id=${conf.nodeId}" - - override def onMetadataUpdate( - delta: MetadataDelta, - newImage: MetadataImage, - manifest: LoaderManifest - ): Unit = { - onMetadataUpdate(delta, newImage) - } - - def onMetadataUpdate( - delta: MetadataDelta, - newImage: MetadataImage, - ): Unit = { - val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" - try { - // Apply changes to SCRAM credentials. - Option(delta.scramDelta()).foreach { scramDelta => - scramDelta.changes().forEach { - case (mechanism, userChanges) => - userChanges.forEach { - case (userName, change) => - if (change.isPresent) { - credentialProvider.updateCredential(mechanism, userName, change.get().toCredential) - } else { - credentialProvider.removeCredentials(mechanism, userName) - } - } - } - } - } catch { - case t: Throwable => faultHandler.handleFault("Uncaught exception while " + - s"publishing SCRAM changes from $deltaName", t) - } - } -} diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 3bb3f5fc3f7..828ca0d7ad4 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.coordinator.share.ShareCoordinator import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage} import org.apache.kafka.image.loader.LogDeltaManifest -import org.apache.kafka.metadata.publisher.AclPublisher +import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.LeaderAndEpoch import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/publisher/ScramPublisher.java b/metadata/src/main/java/org/apache/kafka/metadata/publisher/ScramPublisher.java new file mode 100644 index 00000000000..7a286eb6351 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/publisher/ScramPublisher.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata.publisher; + +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.ScramDelta; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.image.publisher.MetadataPublisher; +import org.apache.kafka.security.CredentialProvider; +import org.apache.kafka.server.fault.FaultHandler; + +public class ScramPublisher implements MetadataPublisher { + private final int nodeId; + private final FaultHandler faultHandler; + private final String nodeType; + private final CredentialProvider credentialProvider; + + public ScramPublisher(int nodeId, FaultHandler faultHandler, String nodeType, CredentialProvider credentialProvider) { + this.nodeId = nodeId; + this.faultHandler = faultHandler; + this.nodeType = nodeType; + this.credentialProvider = credentialProvider; + } + + @Override + public final String name() { + return "ScramPublisher " + nodeType + " id=" + nodeId; + } + + @Override + public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) { + onMetadataUpdate(delta, newImage); + } + + public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) { + String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset(); + try { + // Apply changes to SCRAM credentials. + ScramDelta scramDelta = delta.scramDelta(); + if (scramDelta != null) { + scramDelta.changes().forEach((mechanism, userChanges) -> { + userChanges.forEach((userName, change) -> { + if (change.isPresent()) + credentialProvider.updateCredential(mechanism, userName, change.get().toCredential()); + else + credentialProvider.removeCredentials(mechanism, userName); + }); + }); + } + } catch (Throwable t) { + faultHandler.handleFault("Uncaught exception while publishing SCRAM changes from " + deltaName, t); + } + } +} diff --git a/server/src/main/java/org/apache/kafka/security/CredentialProvider.java b/server-common/src/main/java/org/apache/kafka/security/CredentialProvider.java similarity index 100% rename from server/src/main/java/org/apache/kafka/security/CredentialProvider.java rename to server-common/src/main/java/org/apache/kafka/security/CredentialProvider.java