mirror of https://github.com/apache/kafka.git
KAFKA-18706 Move AclPublisher to metadata module (#18802)
Move AclPublisher to org.apache.kafka.metadata.publisher package. Reviewers: Christo Lolov <lolovc@amazon.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
1584d49470
commit
a5e5e2dcd5
|
|
@ -42,6 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
|
|||
import org.apache.kafka.coordinator.transaction.ProducerIdManager
|
||||
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
|
||||
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
|
||||
import org.apache.kafka.metadata.publisher.AclPublisher
|
||||
import org.apache.kafka.security.CredentialProvider
|
||||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
|
||||
|
|
@ -528,7 +529,7 @@ class BrokerServer(
|
|||
config.nodeId,
|
||||
sharedServer.metadataPublishingFaultHandler,
|
||||
"broker",
|
||||
authorizer
|
||||
authorizer.toJava
|
||||
),
|
||||
sharedServer.initialBrokerMetadataLoadFaultHandler,
|
||||
sharedServer.metadataPublishingFaultHandler
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
|
|||
import kafka.server.QuotaFactory.QuotaManagers
|
||||
|
||||
import scala.collection.immutable
|
||||
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
|
||||
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
|
||||
import kafka.utils.{CoreUtils, Logging}
|
||||
import org.apache.kafka.common.message.ApiMessageType.ListenerType
|
||||
import org.apache.kafka.common.network.ListenerName
|
||||
|
|
@ -37,7 +37,7 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
|
|||
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
|
||||
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
|
||||
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
|
||||
import org.apache.kafka.metadata.publisher.FeaturesPublisher
|
||||
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
|
||||
import org.apache.kafka.raft.QuorumConfig
|
||||
import org.apache.kafka.security.CredentialProvider
|
||||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
|
|
@ -375,7 +375,7 @@ class ControllerServer(
|
|||
config.nodeId,
|
||||
sharedServer.metadataPublishingFaultHandler,
|
||||
"controller",
|
||||
authorizer
|
||||
authorizer.toJava
|
||||
))
|
||||
|
||||
// Install all metadata publishers.
|
||||
|
|
|
|||
|
|
@ -1,102 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.server.metadata
|
||||
|
||||
import kafka.utils.Logging
|
||||
import org.apache.kafka.image.loader.{LoaderManifest, LoaderManifestType}
|
||||
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
|
||||
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
|
||||
import org.apache.kafka.server.authorizer.Authorizer
|
||||
import org.apache.kafka.server.fault.FaultHandler
|
||||
|
||||
import scala.concurrent.TimeoutException
|
||||
|
||||
|
||||
class AclPublisher(
|
||||
nodeId: Int,
|
||||
faultHandler: FaultHandler,
|
||||
nodeType: String,
|
||||
authorizer: Option[Authorizer],
|
||||
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
|
||||
logIdent = s"[${name()}] "
|
||||
|
||||
override def name(): String = s"AclPublisher $nodeType id=$nodeId"
|
||||
|
||||
private var completedInitialLoad = false
|
||||
|
||||
override def onMetadataUpdate(
|
||||
delta: MetadataDelta,
|
||||
newImage: MetadataImage,
|
||||
manifest: LoaderManifest
|
||||
): Unit = {
|
||||
val deltaName = s"MetadataDelta up to ${newImage.offset()}"
|
||||
|
||||
// Apply changes to ACLs. This needs to be handled carefully because while we are
|
||||
// applying these changes, the Authorizer is continuing to return authorization
|
||||
// results in other threads. We never want to expose an invalid state. For example,
|
||||
// if the user created a DENY ALL acl and then created an ALLOW ACL for topic foo,
|
||||
// we want to apply those changes in that order, not the reverse order! Otherwise
|
||||
// there could be a window during which incorrect authorization results are returned.
|
||||
Option(delta.aclsDelta()).foreach { aclsDelta =>
|
||||
authorizer match {
|
||||
case Some(authorizer: ClusterMetadataAuthorizer) => if (manifest.`type`().equals(LoaderManifestType.SNAPSHOT)) {
|
||||
try {
|
||||
// If the delta resulted from a snapshot load, we want to apply the new changes
|
||||
// all at once using ClusterMetadataAuthorizer#loadSnapshot. If this is the
|
||||
// first snapshot load, it will also complete the futures returned by
|
||||
// Authorizer#start (which we wait for before processing RPCs).
|
||||
info(s"Loading authorizer snapshot at offset ${newImage.offset()}")
|
||||
authorizer.loadSnapshot(newImage.acls().acls())
|
||||
} catch {
|
||||
case t: Throwable => faultHandler.handleFault("Error loading " +
|
||||
s"authorizer snapshot in $deltaName", t)
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
// Because the changes map is a LinkedHashMap, the deltas will be returned in
|
||||
// the order they were performed.
|
||||
aclsDelta.changes().forEach((key, value) =>
|
||||
if (value.isPresent) {
|
||||
authorizer.addAcl(key, value.get())
|
||||
} else {
|
||||
authorizer.removeAcl(key)
|
||||
})
|
||||
} catch {
|
||||
case t: Throwable => faultHandler.handleFault("Error loading " +
|
||||
s"authorizer changes in $deltaName", t)
|
||||
}
|
||||
}
|
||||
if (!completedInitialLoad) {
|
||||
// If we are receiving this onMetadataUpdate call, that means the MetadataLoader has
|
||||
// loaded up to the local high water mark. So we complete the initial load, enabling
|
||||
// the authorizer.
|
||||
completedInitialLoad = true
|
||||
authorizer.completeInitialLoad()
|
||||
}
|
||||
case _ => // No ClusterMetadataAuthorizer is configured. There is nothing to do.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def close(): Unit = {
|
||||
authorizer match {
|
||||
case Some(authorizer: ClusterMetadataAuthorizer) => authorizer.completeInitialLoad(new TimeoutException)
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -30,6 +30,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
|
|||
import org.apache.kafka.image.loader.LoaderManifest
|
||||
import org.apache.kafka.image.publisher.MetadataPublisher
|
||||
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
|
||||
import org.apache.kafka.metadata.publisher.AclPublisher
|
||||
import org.apache.kafka.server.common.RequestLocal
|
||||
import org.apache.kafka.server.fault.FaultHandler
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
|
|||
import org.apache.kafka.coordinator.share.ShareCoordinator
|
||||
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance}
|
||||
import org.apache.kafka.image.loader.LogDeltaManifest
|
||||
import org.apache.kafka.metadata.publisher.AclPublisher
|
||||
import org.apache.kafka.network.SocketServerConfigs
|
||||
import org.apache.kafka.raft.LeaderAndEpoch
|
||||
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.metadata.publisher;
|
||||
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.image.MetadataDelta;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.image.loader.LoaderManifest;
|
||||
import org.apache.kafka.image.loader.LoaderManifestType;
|
||||
import org.apache.kafka.image.publisher.MetadataPublisher;
|
||||
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
|
||||
import org.apache.kafka.server.authorizer.Authorizer;
|
||||
import org.apache.kafka.server.fault.FaultHandler;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class AclPublisher implements MetadataPublisher {
|
||||
private final Logger log;
|
||||
private final int nodeId;
|
||||
private final FaultHandler faultHandler;
|
||||
private final String nodeType;
|
||||
private final Optional<ClusterMetadataAuthorizer> authorizer;
|
||||
private boolean completedInitialLoad = false;
|
||||
|
||||
public AclPublisher(int nodeId, FaultHandler faultHandler, String nodeType, Optional<Authorizer> authorizer) {
|
||||
this.nodeId = nodeId;
|
||||
this.faultHandler = faultHandler;
|
||||
this.nodeType = nodeType;
|
||||
this.authorizer = authorizer.filter(ClusterMetadataAuthorizer.class::isInstance).map(ClusterMetadataAuthorizer.class::cast);
|
||||
this.log = new LogContext(name()).logger(AclPublisher.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String name() {
|
||||
return "AclPublisher " + nodeType + " id=" + nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
|
||||
String deltaName = "MetadataDelta up to " + newImage.offset();
|
||||
|
||||
// Apply changes to ACLs. This needs to be handled carefully because while we are
|
||||
// applying these changes, the Authorizer is continuing to return authorization
|
||||
// results in other threads. We never want to expose an invalid state. For example,
|
||||
// if the user created a DENY ALL acl and then created an ALLOW ACL for topic foo,
|
||||
// we want to apply those changes in that order, not the reverse order! Otherwise
|
||||
// there could be a window during which incorrect authorization results are returned.
|
||||
Optional.ofNullable(delta.aclsDelta()).ifPresent(aclsDelta -> {
|
||||
authorizer.ifPresent(clusterMetadataAuthorizer -> {
|
||||
if (manifest.type().equals(LoaderManifestType.SNAPSHOT)) {
|
||||
try {
|
||||
// If the delta resulted from a snapshot load, we want to apply the new changes
|
||||
// all at once using ClusterMetadataAuthorizer#loadSnapshot. If this is the
|
||||
// first snapshot load, it will also complete the futures returned by
|
||||
// Authorizer#start (which we wait for before processing RPCs).
|
||||
log.info("Loading authorizer snapshot at offset {}", newImage.offset());
|
||||
clusterMetadataAuthorizer.loadSnapshot(newImage.acls().acls());
|
||||
} catch (Throwable t) {
|
||||
faultHandler.handleFault("Error loading authorizer snapshot in " + deltaName, t);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
// Because the changes map is a LinkedHashMap, the deltas will be returned in
|
||||
// the order they were performed.
|
||||
aclsDelta.changes().forEach((key, value) -> {
|
||||
if (value.isPresent()) {
|
||||
clusterMetadataAuthorizer.addAcl(key, value.get());
|
||||
} else {
|
||||
clusterMetadataAuthorizer.removeAcl(key);
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
faultHandler.handleFault("Error loading authorizer changes in " + deltaName, t);
|
||||
}
|
||||
}
|
||||
if (!completedInitialLoad) {
|
||||
// If we are receiving this onMetadataUpdate call, that means the MetadataLoader has
|
||||
// loaded up to the local high water mark. So we complete the initial load, enabling
|
||||
// the authorizer.
|
||||
completedInitialLoad = true;
|
||||
clusterMetadataAuthorizer.completeInitialLoad();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
authorizer.ifPresent(clusterMetadataAuthorizer -> clusterMetadataAuthorizer.completeInitialLoad(new TimeoutException()));
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue