mirror of https://github.com/apache/kafka.git
KAFKA-18708: Move ScramPublisher to metadata module (#20468)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
parent
32b8e326da
commit
a244565ed2
|
@ -161,6 +161,7 @@
|
||||||
<allow pkg="org.apache.kafka.metadata" />
|
<allow pkg="org.apache.kafka.metadata" />
|
||||||
<allow pkg="org.apache.kafka.queue" />
|
<allow pkg="org.apache.kafka.queue" />
|
||||||
<allow pkg="org.apache.kafka.raft" />
|
<allow pkg="org.apache.kafka.raft" />
|
||||||
|
<allow pkg="org.apache.kafka.security" />
|
||||||
<allow pkg="org.apache.kafka.server.authorizer" />
|
<allow pkg="org.apache.kafka.server.authorizer" />
|
||||||
<allow pkg="org.apache.kafka.server.common" />
|
<allow pkg="org.apache.kafka.server.common" />
|
||||||
<allow pkg="org.apache.kafka.server.fault" />
|
<allow pkg="org.apache.kafka.server.fault" />
|
||||||
|
|
|
@ -60,6 +60,7 @@
|
||||||
</subpackage>
|
</subpackage>
|
||||||
|
|
||||||
<subpackage name="security">
|
<subpackage name="security">
|
||||||
|
<allow pkg="org.apache.kafka.clients.admin" />
|
||||||
<allow pkg="org.apache.kafka.common.config" />
|
<allow pkg="org.apache.kafka.common.config" />
|
||||||
<allow pkg="org.apache.kafka.common.config.types" />
|
<allow pkg="org.apache.kafka.common.config.types" />
|
||||||
<allow pkg="org.apache.kafka.server.util" />
|
<allow pkg="org.apache.kafka.server.util" />
|
||||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
|
||||||
import org.apache.kafka.coordinator.transaction.ProducerIdManager
|
import org.apache.kafka.coordinator.transaction.ProducerIdManager
|
||||||
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
|
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
|
||||||
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
|
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
|
||||||
import org.apache.kafka.metadata.publisher.AclPublisher
|
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
|
||||||
import org.apache.kafka.security.CredentialProvider
|
import org.apache.kafka.security.CredentialProvider
|
||||||
import org.apache.kafka.server.authorizer.Authorizer
|
import org.apache.kafka.server.authorizer.Authorizer
|
||||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
|
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
|
||||||
|
@ -497,7 +497,7 @@ class BrokerServer(
|
||||||
quotaManagers,
|
quotaManagers,
|
||||||
),
|
),
|
||||||
new ScramPublisher(
|
new ScramPublisher(
|
||||||
config,
|
config.nodeId,
|
||||||
sharedServer.metadataPublishingFaultHandler,
|
sharedServer.metadataPublishingFaultHandler,
|
||||||
"broker",
|
"broker",
|
||||||
credentialProvider),
|
credentialProvider),
|
||||||
|
|
|
@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
|
||||||
import kafka.server.QuotaFactory.QuotaManagers
|
import kafka.server.QuotaFactory.QuotaManagers
|
||||||
|
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
|
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
|
||||||
import kafka.utils.{CoreUtils, Logging}
|
import kafka.utils.{CoreUtils, Logging}
|
||||||
import org.apache.kafka.common.internals.Plugin
|
import org.apache.kafka.common.internals.Plugin
|
||||||
import org.apache.kafka.common.message.ApiMessageType.ListenerType
|
import org.apache.kafka.common.message.ApiMessageType.ListenerType
|
||||||
|
@ -38,7 +38,7 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
|
||||||
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
|
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
|
||||||
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
|
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
|
||||||
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
|
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
|
||||||
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
|
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher, ScramPublisher}
|
||||||
import org.apache.kafka.raft.QuorumConfig
|
import org.apache.kafka.raft.QuorumConfig
|
||||||
import org.apache.kafka.security.CredentialProvider
|
import org.apache.kafka.security.CredentialProvider
|
||||||
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager}
|
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager}
|
||||||
|
@ -350,7 +350,7 @@ class ControllerServer(
|
||||||
|
|
||||||
// Set up the SCRAM publisher.
|
// Set up the SCRAM publisher.
|
||||||
metadataPublishers.add(new ScramPublisher(
|
metadataPublishers.add(new ScramPublisher(
|
||||||
config,
|
config.nodeId,
|
||||||
sharedServer.metadataPublishingFaultHandler,
|
sharedServer.metadataPublishingFaultHandler,
|
||||||
"controller",
|
"controller",
|
||||||
credentialProvider
|
credentialProvider
|
||||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
|
||||||
import org.apache.kafka.image.loader.LoaderManifest
|
import org.apache.kafka.image.loader.LoaderManifest
|
||||||
import org.apache.kafka.image.publisher.MetadataPublisher
|
import org.apache.kafka.image.publisher.MetadataPublisher
|
||||||
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
|
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
|
||||||
import org.apache.kafka.metadata.publisher.AclPublisher
|
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
|
||||||
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
|
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
|
||||||
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
|
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
|
||||||
import org.apache.kafka.server.fault.FaultHandler
|
import org.apache.kafka.server.fault.FaultHandler
|
||||||
|
|
|
@ -1,71 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package kafka.server.metadata
|
|
||||||
|
|
||||||
import kafka.server.KafkaConfig
|
|
||||||
import kafka.utils.Logging
|
|
||||||
import org.apache.kafka.image.loader.LoaderManifest
|
|
||||||
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
|
|
||||||
import org.apache.kafka.security.CredentialProvider
|
|
||||||
import org.apache.kafka.server.fault.FaultHandler
|
|
||||||
|
|
||||||
|
|
||||||
class ScramPublisher(
|
|
||||||
conf: KafkaConfig,
|
|
||||||
faultHandler: FaultHandler,
|
|
||||||
nodeType: String,
|
|
||||||
credentialProvider: CredentialProvider,
|
|
||||||
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
|
|
||||||
logIdent = s"[${name()}] "
|
|
||||||
|
|
||||||
override def name(): String = s"ScramPublisher $nodeType id=${conf.nodeId}"
|
|
||||||
|
|
||||||
override def onMetadataUpdate(
|
|
||||||
delta: MetadataDelta,
|
|
||||||
newImage: MetadataImage,
|
|
||||||
manifest: LoaderManifest
|
|
||||||
): Unit = {
|
|
||||||
onMetadataUpdate(delta, newImage)
|
|
||||||
}
|
|
||||||
|
|
||||||
def onMetadataUpdate(
|
|
||||||
delta: MetadataDelta,
|
|
||||||
newImage: MetadataImage,
|
|
||||||
): Unit = {
|
|
||||||
val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
|
|
||||||
try {
|
|
||||||
// Apply changes to SCRAM credentials.
|
|
||||||
Option(delta.scramDelta()).foreach { scramDelta =>
|
|
||||||
scramDelta.changes().forEach {
|
|
||||||
case (mechanism, userChanges) =>
|
|
||||||
userChanges.forEach {
|
|
||||||
case (userName, change) =>
|
|
||||||
if (change.isPresent) {
|
|
||||||
credentialProvider.updateCredential(mechanism, userName, change.get().toCredential)
|
|
||||||
} else {
|
|
||||||
credentialProvider.removeCredentials(mechanism, userName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
|
|
||||||
s"publishing SCRAM changes from $deltaName", t)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
|
||||||
import org.apache.kafka.coordinator.share.ShareCoordinator
|
import org.apache.kafka.coordinator.share.ShareCoordinator
|
||||||
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
|
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
|
||||||
import org.apache.kafka.image.loader.LogDeltaManifest
|
import org.apache.kafka.image.loader.LogDeltaManifest
|
||||||
import org.apache.kafka.metadata.publisher.AclPublisher
|
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
|
||||||
import org.apache.kafka.network.SocketServerConfigs
|
import org.apache.kafka.network.SocketServerConfigs
|
||||||
import org.apache.kafka.raft.LeaderAndEpoch
|
import org.apache.kafka.raft.LeaderAndEpoch
|
||||||
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}
|
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.metadata.publisher;
|
||||||
|
|
||||||
|
import org.apache.kafka.image.MetadataDelta;
|
||||||
|
import org.apache.kafka.image.MetadataImage;
|
||||||
|
import org.apache.kafka.image.ScramDelta;
|
||||||
|
import org.apache.kafka.image.loader.LoaderManifest;
|
||||||
|
import org.apache.kafka.image.publisher.MetadataPublisher;
|
||||||
|
import org.apache.kafka.security.CredentialProvider;
|
||||||
|
import org.apache.kafka.server.fault.FaultHandler;
|
||||||
|
|
||||||
|
public class ScramPublisher implements MetadataPublisher {
|
||||||
|
private final int nodeId;
|
||||||
|
private final FaultHandler faultHandler;
|
||||||
|
private final String nodeType;
|
||||||
|
private final CredentialProvider credentialProvider;
|
||||||
|
|
||||||
|
public ScramPublisher(int nodeId, FaultHandler faultHandler, String nodeType, CredentialProvider credentialProvider) {
|
||||||
|
this.nodeId = nodeId;
|
||||||
|
this.faultHandler = faultHandler;
|
||||||
|
this.nodeType = nodeType;
|
||||||
|
this.credentialProvider = credentialProvider;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final String name() {
|
||||||
|
return "ScramPublisher " + nodeType + " id=" + nodeId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
|
||||||
|
onMetadataUpdate(delta, newImage);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
|
||||||
|
String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
|
||||||
|
try {
|
||||||
|
// Apply changes to SCRAM credentials.
|
||||||
|
ScramDelta scramDelta = delta.scramDelta();
|
||||||
|
if (scramDelta != null) {
|
||||||
|
scramDelta.changes().forEach((mechanism, userChanges) -> {
|
||||||
|
userChanges.forEach((userName, change) -> {
|
||||||
|
if (change.isPresent())
|
||||||
|
credentialProvider.updateCredential(mechanism, userName, change.get().toCredential());
|
||||||
|
else
|
||||||
|
credentialProvider.removeCredentials(mechanism, userName);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
faultHandler.handleFault("Uncaught exception while publishing SCRAM changes from " + deltaName, t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue