KAFKA-18888: Add KIP-877 support to Authorizer (#19050)

This also adds metrics to StandardAuthorizer

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ken Huang
 <s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, TaiJuWu
 <tjwu1217@gmail.com>
This commit is contained in:
Mickael Maison 2025-04-15 19:40:24 +02:00 committed by GitHub
parent e9ca0bb0f6
commit fb2ce76b49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
43 changed files with 663 additions and 248 deletions

View File

@ -176,6 +176,9 @@
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.metrics.internals" />
<allow pkg="org.apache.kafka.common.metrics.stats" />
</subpackage>
<subpackage name="bootstrap">
<allow pkg="org.apache.kafka.snapshot" />

View File

@ -105,6 +105,7 @@
<subpackage name="security">
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.server.authorizer" />
</subpackage>

View File

@ -30,6 +30,13 @@ import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
/**
* Plugins have the following tags:
* <ul>
* <li><code>config</code> set to the name of the configuration to specifying the plugin</li>
* <li><code>class</code> set to the name of the instance class</li>
* </ul>
*/
public class Plugin<T> implements Supplier<T>, AutoCloseable {
private final T instance;
@ -40,14 +47,49 @@ public class Plugin<T> implements Supplier<T>, AutoCloseable {
this.pluginMetrics = Optional.ofNullable(pluginMetrics);
}
/**
* Wrap an instance into a Plugin.
* @param instance the instance to wrap
* @param metrics the metrics
* @param tagsSupplier supplier to retrieve the tags
* @return the plugin
*/
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, Supplier<Map<String, String>> tagsSupplier) {
PluginMetricsImpl pluginMetrics = null;
if (instance instanceof Monitorable && metrics != null) {
pluginMetrics = new PluginMetricsImpl(metrics, tagsSupplier.get());
((Monitorable) instance).withPluginMetrics(pluginMetrics);
}
return new Plugin<>(instance, pluginMetrics);
}
/**
* Wrap an instance into a Plugin.
* @param instance the instance to wrap
* @param metrics the metrics
* @param key the value for the <code>config</code> tag
* @return the plugin
*/
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key) {
return wrapInstance(instance, metrics, () -> tags(key, instance));
}
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key, Map<String, String> extraTags) {
/**
* Wrap an instance into a Plugin.
* @param instance the instance to wrap
* @param metrics the metrics
* @param name extra tag name to add
* @param value extra tag value to add
* @param key the value for the <code>config</code> tag
* @return the plugin
*/
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key, String name, String value) {
Supplier<Map<String, String>> tagsSupplier = () -> {
Map<String, String> tags = tags(key, instance);
tags.putAll(extraTags);
return wrapInstance(instance, metrics, () -> tags);
tags.put(name, value);
return tags;
};
return wrapInstance(instance, metrics, tagsSupplier);
}
private static <T> Map<String, String> tags(String key, T instance) {
@ -57,6 +99,13 @@ public class Plugin<T> implements Supplier<T>, AutoCloseable {
return tags;
}
/**
* Wrap a list of instances into Plugins.
* @param instances the instances to wrap
* @param metrics the metrics
* @param key the value for the <code>config</code> tag
* @return the list of plugins
*/
public static <T> List<Plugin<T>> wrapInstances(List<T> instances, Metrics metrics, String key) {
List<Plugin<T>> plugins = new ArrayList<>();
for (T instance : instances) {
@ -65,15 +114,6 @@ public class Plugin<T> implements Supplier<T>, AutoCloseable {
return plugins;
}
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, Supplier<Map<String, String>> tagsSupplier) {
PluginMetricsImpl pluginMetrics = null;
if (instance instanceof Monitorable && metrics != null) {
pluginMetrics = new PluginMetricsImpl(metrics, tagsSupplier.get());
((Monitorable) instance).withPluginMetrics(pluginMetrics);
}
return new Plugin<>(instance, pluginMetrics);
}
@Override
public T get() {
return instance;

View File

@ -59,6 +59,10 @@ import java.util.concurrent.CompletionStage;
*
* Authorizer implementation class may optionally implement @{@link org.apache.kafka.common.Reconfigurable}
* to enable dynamic reconfiguration without restarting the broker.
* <p>Authorizer implementation class may also optionally implement {@link org.apache.kafka.common.metrics.Monitorable}
* to enable the authorizer to register metrics. The following tags are automatically added to all metrics registered:
* <code>config</code> set to <code>authorizer.class.name</code>, <code>class</code> set to the Authorizer class name,
* and <code>role</code> set to either <code>broker</code> or <code>controller</code>.
* <p>
* <b>Threading model:</b>
* <ul>

View File

@ -27,7 +27,6 @@ import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.QuotaType;
import java.util.Map;
import java.util.Optional;
import scala.Option;
@ -150,7 +149,7 @@ public class QuotaFactory {
clientQuotaCallback,
metrics,
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG,
Map.of("role", role)
"role", role
));
}

View File

@ -28,6 +28,7 @@ import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;
import kafka.server.share.SharePartitionManager;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
@ -58,7 +59,7 @@ public class KafkaApisBuilder {
private ConfigRepository configRepository = null;
private MetadataCache metadataCache = null;
private Metrics metrics = null;
private Optional<Authorizer> authorizer = Optional.empty();
private Optional<Plugin<Authorizer>> authorizerPlugin = Optional.empty();
private QuotaManagers quotas = null;
private FetchManager fetchManager = null;
private SharePartitionManager sharePartitionManager = null;
@ -131,8 +132,8 @@ public class KafkaApisBuilder {
return this;
}
public KafkaApisBuilder setAuthorizer(Optional<Authorizer> authorizer) {
this.authorizer = authorizer;
public KafkaApisBuilder setAuthorizerPlugin(Optional<Plugin<Authorizer>> authorizerPlugin) {
this.authorizerPlugin = authorizerPlugin;
return this;
}
@ -219,7 +220,7 @@ public class KafkaApisBuilder {
configRepository,
metadataCache,
metrics,
OptionConverters.toScala(authorizer),
OptionConverters.toScala(authorizerPlugin),
quotas,
fetchManager,
sharePartitionManager,

View File

@ -22,6 +22,7 @@ import kafka.utils.Logging
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclBinding
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult
import org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult
import org.apache.kafka.common.message._
@ -46,7 +47,7 @@ import scala.jdk.OptionConverters.RichOptional
* Logic to handle ACL requests.
*/
class AclApis(authHelper: AuthHelper,
authorizer: Option[Authorizer],
authorizerPlugin: Option[Plugin[Authorizer]],
requestHelper: RequestHandlerHelper,
role: ProcessRole,
config: KafkaConfig) extends Logging {
@ -61,7 +62,7 @@ class AclApis(authHelper: AuthHelper,
def handleDescribeAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, DESCRIBE)
val describeAclsRequest = request.body[DescribeAclsRequest]
authorizer match {
authorizerPlugin match {
case None =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeAclsResponse(new DescribeAclsResponseData()
@ -74,7 +75,7 @@ class AclApis(authHelper: AuthHelper,
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeAclsResponse(new DescribeAclsResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setResources(DescribeAclsResponse.aclsResources(auth.acls(filter))),
.setResources(DescribeAclsResponse.aclsResources(auth.get.acls(filter))),
describeAclsRequest.version))
}
CompletableFuture.completedFuture[Unit](())
@ -84,7 +85,7 @@ class AclApis(authHelper: AuthHelper,
authHelper.authorizeClusterOperation(request, ALTER)
val createAclsRequest = request.body[CreateAclsRequest]
authorizer match {
authorizerPlugin match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
createAclsRequest.getErrorResponse(requestThrottleMs,
new SecurityDisabledException("No Authorizer is configured.")))
@ -109,7 +110,7 @@ class AclApis(authHelper: AuthHelper,
}
val future = new CompletableFuture[util.List[AclCreationResult]]()
val createResults = auth.createAcls(request.context, validBindings.asJava).stream().map(_.toCompletableFuture).toList
val createResults = auth.get.createAcls(request.context, validBindings.asJava).stream().map(_.toCompletableFuture).toList
def sendResponseCallback(): Unit = {
val aclCreationResults = allBindings.map { acl =>
@ -139,7 +140,7 @@ class AclApis(authHelper: AuthHelper,
def handleDeleteAcls(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, ALTER)
val deleteAclsRequest = request.body[DeleteAclsRequest]
authorizer match {
authorizerPlugin match {
case None =>
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
deleteAclsRequest.getErrorResponse(requestThrottleMs,
@ -148,7 +149,7 @@ class AclApis(authHelper: AuthHelper,
case Some(auth) =>
val future = new CompletableFuture[util.List[DeleteAclsFilterResult]]()
val deleteResults = auth.deleteAcls(request.context, deleteAclsRequest.filters)
val deleteResults = auth.get.deleteAcls(request.context, deleteAclsRequest.filters)
.stream().map(_.toCompletableFuture).toList
def sendResponseCallback(): Unit = {

View File

@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.EndpointType
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.acl.AclOperation.DESCRIBE
import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.DescribeClusterResponseData
import org.apache.kafka.common.message.DescribeClusterResponseData.DescribeClusterBrokerCollection
import org.apache.kafka.common.protocol.Errors
@ -38,7 +39,7 @@ import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authoriz
import scala.collection.Seq
import scala.jdk.CollectionConverters._
class AuthHelper(authorizer: Option[Authorizer]) {
class AuthHelper(authorizer: Option[Plugin[Authorizer]]) {
def authorize(requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType,
@ -49,7 +50,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
authorizer.forall { authZ =>
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
authZ.get.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
}
}
@ -64,7 +65,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
case Some(authZ) =>
val resourcePattern = new ResourcePattern(resource.resourceType, resource.name, PatternType.LITERAL)
val actions = supportedOps.map { op => new Action(op, resourcePattern, 1, false, false) }
authZ.authorize(request.context, actions.asJava).asScala
authZ.get.authorize(request.context, actions.asJava).asScala
.zip(supportedOps)
.filter(_._1 == AuthorizationResult.ALLOWED)
.map(_._2).toSet
@ -77,7 +78,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
def authorizeByResourceType(requestContext: RequestContext, operation: AclOperation,
resourceType: ResourceType): Boolean = {
authorizer.forall { authZ =>
authZ.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED
authZ.get.authorizeByResourceType(requestContext, operation, resourceType) == AuthorizationResult.ALLOWED
}
}
@ -109,7 +110,7 @@ class AuthHelper(authorizer: Option[Authorizer]) {
val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
new Action(operation, resource, count, logIfAllowed, logIfDenied)
}.toBuffer
authZ.authorize(requestContext, actions.asJava).asScala
authZ.get.authorize(requestContext, actions.asJava).asScala
.zip(resourceNameToCount.keySet)
.collect { case (authzResult, resourceName) if authzResult == AuthorizationResult.ALLOWED =>
resourceName

View File

@ -27,6 +27,7 @@ import kafka.server.metadata._
import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl, SharePartitionManager}
import kafka.utils.CoreUtils
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
@ -103,7 +104,7 @@ class BrokerServer(
@volatile var dataPlaneRequestProcessor: KafkaApis = _
var authorizer: Option[Authorizer] = None
var authorizerPlugin: Option[Plugin[Authorizer]] = None
@volatile var socketServer: SocketServer = _
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = _
@ -412,8 +413,7 @@ class BrokerServer(
)
// Create and initialize an authorizer if one is configured.
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.BrokerRole.toString)
// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
// for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange
@ -456,7 +456,7 @@ class BrokerServer(
configRepository = metadataCache,
metadataCache = metadataCache,
metrics = metrics,
authorizer = authorizer,
authorizerPlugin = authorizerPlugin,
quotas = quotaManagers,
fetchManager = fetchManager,
sharePartitionManager = sharePartitionManager,
@ -529,7 +529,7 @@ class BrokerServer(
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
authorizer.toJava
authorizerPlugin.toJava
),
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler
@ -586,7 +586,7 @@ class BrokerServer(
// authorizer future is completed.
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
builder.build(authorizer.toJava,
builder.build(authorizerPlugin.toJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
@ -645,7 +645,7 @@ class BrokerServer(
.withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
.withGroupConfigManager(groupConfigManager)
.withPersister(persister)
.withAuthorizer(authorizer.toJava)
.withAuthorizerPlugin(authorizerPlugin.toJava)
.build()
}
@ -765,7 +765,7 @@ class BrokerServer(
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (dataPlaneRequestProcessor != null)
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer plugin"))
/**
* We must shutdown the scheduler early because otherwise, the scheduler could touch other

View File

@ -34,8 +34,7 @@ import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, CREATE_TOKENS, DELETE, DESCRIBE, DESCRIBE_CONFIGS}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException, UnsupportedVersionException}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.{FatalExitError, Plugin, Topic}
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
@ -68,7 +67,7 @@ import scala.jdk.CollectionConverters._
*/
class ControllerApis(
val requestChannel: RequestChannel,
val authorizer: Option[Authorizer],
val authorizerPlugin: Option[Plugin[Authorizer]],
val quotas: QuotaManagers,
val time: Time,
val controller: Controller,
@ -81,11 +80,11 @@ class ControllerApis(
) extends ApiRequestHandler with Logging {
this.logIdent = s"[ControllerApis nodeId=${config.nodeId}] "
val authHelper = new AuthHelper(authorizer)
val authHelper = new AuthHelper(authorizerPlugin)
val configHelper = new ConfigHelper(metadataCache, config, metadataCache)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
val runtimeLoggerManager = new RuntimeLoggerManager(config.nodeId, logger.underlying)
private val aclApis = new AclApis(authHelper, authorizer, requestHelper, ProcessRole.ControllerRole, config)
private val aclApis = new AclApis(authHelper, authorizerPlugin, requestHelper, ProcessRole.ControllerRole, config)
def isClosed: Boolean = aclApis.isClosed

View File

@ -24,6 +24,7 @@ import kafka.server.QuotaFactory.QuotaManagers
import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.metrics.Metrics
@ -82,7 +83,7 @@ class ControllerServer(
var status: ProcessStatus = SHUTDOWN
var linuxIoMetricsCollector: LinuxIoMetricsCollector = _
@volatile var authorizer: Option[Authorizer] = None
@volatile var authorizerPlugin: Option[Plugin[Authorizer]] = None
var tokenCache: DelegationTokenCache = _
var credentialProvider: CredentialProvider = _
var socketServer: SocketServer = _
@ -138,8 +139,7 @@ class ControllerServer(
metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
}
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
authorizerPlugin = config.createNewAuthorizer(metrics, ProcessRole.ControllerRole.toString)
metadataCache = new KRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())
@ -181,7 +181,7 @@ class ControllerServer(
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
builder.build(authorizer.toJava,
builder.build(authorizerPlugin.toJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
@ -260,10 +260,12 @@ class ControllerServer(
// If we are using a ClusterMetadataAuthorizer, requests to add or remove ACLs must go
// through the controller.
authorizer match {
case Some(a: ClusterMetadataAuthorizer) => a.setAclMutator(controller)
authorizerPlugin.foreach { plugin =>
plugin.get match {
case a: ClusterMetadataAuthorizer => a.setAclMutator(controller)
case _ =>
}
}
quotaManagers = QuotaFactory.instantiate(config,
metrics,
@ -271,7 +273,7 @@ class ControllerServer(
s"controller-${config.nodeId}-", ProcessRole.ControllerRole.toString)
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
authorizer,
authorizerPlugin,
quotaManagers,
time,
controller,
@ -376,7 +378,7 @@ class ControllerServer(
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
authorizer.toJava
authorizerPlugin.toJava
))
// Install all metadata publishers.
@ -468,7 +470,7 @@ class ControllerServer(
CoreUtils.swallow(quotaManagers.shutdown(), this)
Utils.closeQuietly(controller, "controller")
Utils.closeQuietly(quorumControllerMetrics, "quorum controller metrics")
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer plugin"))
createTopicPolicy.foreach(policy => Utils.closeQuietly(policy, "create topic policy"))
alterConfigPolicy.foreach(policy => Utils.closeQuietly(policy, "alter config policy"))
socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))

View File

@ -283,10 +283,12 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
* directly. They are provided both old and new configs.
*/
def addReconfigurables(kafkaServer: KafkaBroker): Unit = {
kafkaServer.authorizer match {
case Some(authz: Reconfigurable) => addReconfigurable(authz)
kafkaServer.authorizerPlugin.foreach { plugin =>
plugin.get match {
case authz: Reconfigurable => addReconfigurable(authz)
case _ =>
}
}
addReconfigurable(kafkaServer.kafkaYammerMetrics)
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
addReconfigurable(new DynamicClientQuotaCallback(kafkaServer.quotaManagers, kafkaServer.config))
@ -303,10 +305,12 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
* Add reconfigurables to be notified when a dynamic controller config is updated.
*/
def addReconfigurables(controller: ControllerServer): Unit = {
controller.authorizer match {
case Some(authz: Reconfigurable) => addReconfigurable(authz)
controller.authorizerPlugin.foreach { plugin =>
plugin.get match {
case authz: Reconfigurable => addReconfigurable(authz)
case _ =>
}
}
if (!kafkaConfig.processRoles.contains(ProcessRole.BrokerRole)) {
// only add these if the controller isn't also running the broker role
// because these would already be added via the broker in that case

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.internals.{FatalExitError, Topic}
import org.apache.kafka.common.internals.{FatalExitError, Plugin, Topic}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult, AddPartitionsToTxnResultCollection}
import org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult, DeleteRecordsTopicResult}
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic
@ -99,7 +99,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val configRepository: ConfigRepository,
val metadataCache: MetadataCache,
val metrics: Metrics,
val authorizer: Option[Authorizer],
val authorizerPlugin: Option[Plugin[Authorizer]],
val quotas: QuotaManagers,
val fetchManager: FetchManager,
val sharePartitionManager: SharePartitionManager,
@ -115,9 +115,9 @@ class KafkaApis(val requestChannel: RequestChannel,
type FetchResponseStats = Map[TopicPartition, RecordValidationStats]
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
val configHelper = new ConfigHelper(metadataCache, config, configRepository)
val authHelper = new AuthHelper(authorizer)
val authHelper = new AuthHelper(authorizerPlugin)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
val aclApis = new AclApis(authHelper, authorizer, requestHelper, ProcessRole.BrokerRole, config)
val aclApis = new AclApis(authHelper, authorizerPlugin, requestHelper, ProcessRole.BrokerRole, config)
val configManager = new ConfigAdminManager(brokerId, config, configRepository)
val describeTopicPartitionsRequestHandler = new DescribeTopicPartitionsRequestHandler(
metadataCache, authHelper, config)
@ -2598,7 +2598,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// Clients are not allowed to see topics that are not authorized for Describe.
if (authorizer.isDefined) {
if (authorizerPlugin.isDefined) {
val topicsToCheck = response.groups.stream()
.flatMap(group => group.members.stream)
.flatMap(member => util.stream.Stream.of(member.assignment, member.targetAssignment))
@ -2779,7 +2779,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// Clients are not allowed to see topics that are not authorized for Describe.
if (authorizer.isDefined) {
if (authorizerPlugin.isDefined) {
val topicsToCheck = response.groups.stream()
.filter(group => group.topology != null)
.flatMap(group => group.topology.subtopologies.stream)
@ -2950,7 +2950,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// Clients are not allowed to see topics that are not authorized for Describe.
if (authorizer.isDefined) {
if (authorizerPlugin.isDefined) {
val topicsToCheck = response.groups.stream()
.flatMap(group => group.members.stream)
.flatMap(member => member.assignment.topicPartitions.stream)

View File

@ -22,7 +22,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.network.SocketServer
import kafka.utils.Logging
import org.apache.kafka.common.ClusterResource
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.internals.{ClusterResourceListeners, Plugin}
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
@ -78,7 +78,7 @@ trait KafkaBroker extends Logging {
// acquire lock while handling Fetch requests.
val NumFetchSessionCacheShards: Int = 8
def authorizer: Option[Authorizer]
def authorizerPlugin: Option[Plugin[Authorizer]]
def brokerState: BrokerState
def clusterId: String
def config: KafkaConfig

View File

@ -27,6 +27,8 @@ import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResourc
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
@ -270,12 +272,12 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
/************* Authorizer Configuration ***********/
def createNewAuthorizer(): Option[Authorizer] = {
def createNewAuthorizer(metrics: Metrics, role: String): Option[Plugin[Authorizer]] = {
val className = getString(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG)
if (className == null || className.isEmpty)
None
else {
Some(AuthorizerUtils.createAuthorizer(className))
Some(AuthorizerUtils.createAuthorizer(className, originals, metrics, ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, role))
}
}

View File

@ -53,6 +53,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey}
@ -2660,7 +2661,7 @@ class ReplicaManager(val config: KafkaConfig,
config.replicaSelectorClassName.map { className =>
val tmpReplicaSelector: ReplicaSelector = Utils.newInstance(className, classOf[ReplicaSelector])
tmpReplicaSelector.configure(config.originals())
Plugin.wrapInstance(tmpReplicaSelector, metrics, className)
Plugin.wrapInstance(tmpReplicaSelector, metrics, ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG)
}
}

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
@ -113,6 +114,7 @@ class DescribeTopicPartitionsRequestHandlerTest {
void testDescribeTopicPartitionsRequest() {
// 1. Set up authorizer
Authorizer authorizer = mock(Authorizer.class);
Plugin<Authorizer> authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name");
String unauthorizedTopic = "unauthorized-topic";
String authorizedTopic = "authorized-topic";
String authorizedNonExistTopic = "authorized-non-exist";
@ -190,7 +192,7 @@ class DescribeTopicPartitionsRequestHandlerTest {
KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> KRaftVersion.KRAFT_VERSION_1);
updateKraftMetadataCache(metadataCache, records);
DescribeTopicPartitionsRequestHandler handler =
new DescribeTopicPartitionsRequestHandler(metadataCache, new AuthHelper(scala.Option.apply(authorizer)), createKafkaDefaultConfig());
new DescribeTopicPartitionsRequestHandler(metadataCache, new AuthHelper(scala.Option.apply(authorizerPlugin)), createKafkaDefaultConfig());
// 3.1 Basic test
DescribeTopicPartitionsRequest describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(
@ -312,6 +314,7 @@ class DescribeTopicPartitionsRequestHandlerTest {
void testDescribeTopicPartitionsRequestWithEdgeCases() {
// 1. Set up authorizer
Authorizer authorizer = mock(Authorizer.class);
Plugin<Authorizer> authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name");
String authorizedTopic = "authorized-topic1";
String authorizedTopic2 = "authorized-topic2";
@ -387,7 +390,7 @@ class DescribeTopicPartitionsRequestHandlerTest {
KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () -> KRaftVersion.KRAFT_VERSION_1);
updateKraftMetadataCache(metadataCache, records);
DescribeTopicPartitionsRequestHandler handler =
new DescribeTopicPartitionsRequestHandler(metadataCache, new AuthHelper(scala.Option.apply(authorizer)), createKafkaDefaultConfig());
new DescribeTopicPartitionsRequestHandler(metadataCache, new AuthHelper(scala.Option.apply(authorizerPlugin)), createKafkaDefaultConfig());
// 3.1 With cursor point to the first one
DescribeTopicPartitionsRequest describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()

View File

@ -2641,8 +2641,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
map(_.toCompletableFuture.get).flatMap { deletion =>
deletion.aclBindingDeleteResults().asScala.map(_.aclBinding.pattern).toSet
}.foreach { resource =>
(brokers.map(_.authorizer.get) ++ controllerServers.map(_.authorizer.get)).foreach { authorizer =>
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer, resource, aclEntryFilter)
(brokers.map(_.authorizerPlugin.get) ++ controllerServers.map(_.authorizerPlugin.get)).foreach { authorizer =>
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer.get, resource, aclEntryFilter)
}
}
}

View File

@ -70,7 +70,7 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE
superuserAdminClient.createAcls(List(AclTokenOtherDescribe, AclTokenCreate, AclTokenDescribe).asJava).values
brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TokenCreateAcl ++ TokenDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get,
TestUtils.waitAndVerifyAcls(TokenCreateAcl ++ TokenDescribeAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get,
new ResourcePattern(USER, clientPrincipal.toString, LITERAL))
}
}

View File

@ -251,8 +251,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.createAcls(List(AclWildcardGroupRead).asJava).values
brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource)
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, wildcardTopicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, wildcardGroupResource)
}
}
@ -262,8 +262,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.createAcls(List(AclPrefixedGroupRead).asJava).values
brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedTopicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource)
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, prefixedTopicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, prefixedGroupResource)
}
}
@ -276,9 +276,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get,
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get,
new ResourcePattern(TOPIC, tp.topic, LITERAL))
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, groupResource)
}
}
@ -292,7 +292,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val superuserAdminClient = createSuperuserAdminClient()
superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, groupResource)
}
}
@ -382,7 +382,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.createAcls(List(AclTopicDescribe()).asJava).values
brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, topicResource)
}
val prop = new Properties()
@ -450,8 +450,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, groupResource)
}
val producer = createProducer()
@ -461,8 +461,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.deleteAcls(List(AclTopicWrite().toFilter).asJava).values
brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
TestUtils.waitAndVerifyAcls(TopicCreateAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, groupResource)
}
}
@ -496,8 +496,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
superuserAdminClient.createAcls(List(AclGroupRead).asJava).values
brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, groupResource)
}
val producer = createProducer()
sendRecords(producer, numRecords, tp)
@ -513,7 +513,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val superuserAdminClient = createSuperuserAdminClient()
superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values
brokers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizerPlugin.get, topicResource)
}
val producer = createProducer()
sendRecords(producer, numRecords, tp)

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUS
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, TopicConfig}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, DelegationTokenExpiredException, DelegationTokenNotFoundException, InvalidRequestException, TimeoutException, TopicAuthorizationException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.resource.PatternType.LITERAL
import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
@ -96,9 +97,9 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
superUserAdmin.createAcls(clusterAcls.map(ace => new AclBinding(clusterResourcePattern, ace)).asJava)
brokers.foreach { b =>
TestUtils.waitAndVerifyAcls(Set(ace), b.dataPlaneRequestProcessor.authorizer.get, new ResourcePattern(TOPIC, "*", LITERAL))
TestUtils.waitAndVerifyAcls(Set(ace), b.dataPlaneRequestProcessor.authorizer.get, new ResourcePattern(GROUP, "*", LITERAL))
TestUtils.waitAndVerifyAcls(clusterAcls.toSet, b.dataPlaneRequestProcessor.authorizer.get, clusterResourcePattern)
TestUtils.waitAndVerifyAcls(Set(ace), b.dataPlaneRequestProcessor.authorizerPlugin.get, new ResourcePattern(TOPIC, "*", LITERAL))
TestUtils.waitAndVerifyAcls(Set(ace), b.dataPlaneRequestProcessor.authorizerPlugin.get, new ResourcePattern(GROUP, "*", LITERAL))
TestUtils.waitAndVerifyAcls(clusterAcls.toSet, b.dataPlaneRequestProcessor.authorizerPlugin.get, clusterResourcePattern)
}
}
@ -283,7 +284,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
// Delete only ACLs on literal 'mytopic2' topic
var deleted = client.deleteAcls(List(acl2.toFilter).asJava).all().get().asScala.toSet
brokers.foreach { b =>
waitAndVerifyRemovedAcl(acl2.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl2.pattern())
waitAndVerifyRemovedAcl(acl2.entry(), b.dataPlaneRequestProcessor.authorizerPlugin.get, acl2.pattern())
}
assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls))
@ -292,7 +293,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
// Delete only ACLs on literal '*' topic
deleted = client.deleteAcls(List(anyAcl.toFilter).asJava).all().get().asScala.toSet
brokers.foreach { b =>
waitAndVerifyRemovedAcl(anyAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, anyAcl.pattern())
waitAndVerifyRemovedAcl(anyAcl.entry(), b.dataPlaneRequestProcessor.authorizerPlugin.get, anyAcl.pattern())
}
assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls))
@ -301,7 +302,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
// Delete only ACLs on specific prefixed 'mytopic' topics:
deleted = client.deleteAcls(List(prefixAcl.toFilter).asJava).all().get().asScala.toSet
brokers.foreach { b =>
waitAndVerifyRemovedAcl(prefixAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, prefixAcl.pattern())
waitAndVerifyRemovedAcl(prefixAcl.entry(), b.dataPlaneRequestProcessor.authorizerPlugin.get, prefixAcl.pattern())
}
assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls))
@ -311,7 +312,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
deleted = client.deleteAcls(List(allLiteralTopicAcls).asJava).all().get().asScala.toSet
brokers.foreach { b =>
Set(anyAcl, acl2, fooAcl).foreach(acl =>
waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern())
waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizerPlugin.get, acl.pattern())
)
}
assertEquals(Set(prefixAcl), getAcls(allTopicAcls))
@ -321,7 +322,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
// Delete all prefixed ACLs:
deleted = client.deleteAcls(List(allPrefixedTopicAcls).asJava).all().get().asScala.toSet
brokers.foreach { b =>
waitAndVerifyRemovedAcl(prefixAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, prefixAcl.pattern())
waitAndVerifyRemovedAcl(prefixAcl.entry(), b.dataPlaneRequestProcessor.authorizerPlugin.get, prefixAcl.pattern())
}
assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls))
@ -331,7 +332,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
deleted = client.deleteAcls(List(allTopicAcls).asJava).all().get().asScala.toSet
brokers.foreach { b =>
Set(anyAcl, acl2, fooAcl, prefixAcl).foreach(acl =>
waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern())
waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizerPlugin.get, acl.pattern())
)
}
assertEquals(Set(), getAcls(allTopicAcls))
@ -357,7 +358,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
// Delete only (legacy) ACLs on 'mytopic2' topic
var deleted = client.deleteAcls(List(legacyMyTopic2Acls).asJava).all().get().asScala.toSet
brokers.foreach { b =>
waitAndVerifyRemovedAcl(acl2.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl2.pattern())
waitAndVerifyRemovedAcl(acl2.entry(), b.dataPlaneRequestProcessor.authorizerPlugin.get, acl2.pattern())
}
assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls))
@ -366,7 +367,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
// Delete only (legacy) ACLs on '*' topic
deleted = client.deleteAcls(List(legacyAnyTopicAcls).asJava).all().get().asScala.toSet
brokers.foreach { b =>
waitAndVerifyRemovedAcl(anyAcl.entry(), b.dataPlaneRequestProcessor.authorizer.get, anyAcl.pattern())
waitAndVerifyRemovedAcl(anyAcl.entry(), b.dataPlaneRequestProcessor.authorizerPlugin.get, anyAcl.pattern())
}
assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls))
@ -376,7 +377,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
deleted = client.deleteAcls(List(legacyAllTopicAcls).asJava).all().get().asScala.toSet
brokers.foreach { b =>
Set(anyAcl, acl2, fooAcl).foreach(acl =>
waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizer.get, acl.pattern())
waitAndVerifyRemovedAcl(acl.entry(), b.dataPlaneRequestProcessor.authorizerPlugin.get, acl.pattern())
)
}
assertEquals(Set(), getAcls(legacyAllTopicAcls))
@ -542,7 +543,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
val ace = clusterAcl(permissionType, operation)
superUserAdmin.createAcls(List(new AclBinding(clusterResourcePattern, ace)).asJava)
brokers.foreach { b =>
waitAndVerifyAcl(ace, b.dataPlaneRequestProcessor.authorizer.get, clusterResourcePattern)
waitAndVerifyAcl(ace, b.dataPlaneRequestProcessor.authorizerPlugin.get, clusterResourcePattern)
}
}
@ -551,7 +552,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
superUserAdmin.deleteAcls(List(new AclBinding(clusterResourcePattern, ace).toFilter).asJava).values
brokers.foreach { b =>
waitAndVerifyRemovedAcl(ace, b.dataPlaneRequestProcessor.authorizer.get, clusterResourcePattern)
waitAndVerifyRemovedAcl(ace, b.dataPlaneRequestProcessor.authorizerPlugin.get, clusterResourcePattern)
}
}
@ -709,11 +710,11 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
def waitAndVerifyRemovedAcl(expectedToRemoved: AccessControlEntry,
authorizer: JAuthorizer,
authorizerPlugin: Plugin[JAuthorizer],
resource: ResourcePattern,
accessControlEntryFilter: AccessControlEntryFilter = AccessControlEntryFilter.ANY): Unit = {
val newLine = scala.util.Properties.lineSeparator
val authorizer = authorizerPlugin.get
val filter = new AclBindingFilter(resource.toFilter, accessControlEntryFilter)
waitUntilTrue(() => !authorizer.acls(filter).asScala.map(_.entry).toSet.contains(expectedToRemoved),
s"expected acl to be removed : $expectedToRemoved" +
@ -722,11 +723,11 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
}
def waitAndVerifyAcl(expected: AccessControlEntry,
authorizer: JAuthorizer,
authorizerPlugin: Plugin[JAuthorizer],
resource: ResourcePattern,
accessControlEntryFilter: AccessControlEntryFilter = AccessControlEntryFilter.ANY): Unit = {
val newLine = scala.util.Properties.lineSeparator
val authorizer = authorizerPlugin.get
val filter = new AclBindingFilter(resource.toFilter, accessControlEntryFilter)
waitUntilTrue(() => authorizer.acls(filter).asScala.map(_.entry).toSet.contains(expected),
s"expected to contain acl: $expected" +

View File

@ -1229,9 +1229,9 @@ class KRaftClusterTest {
def assertFoobarValue(expected: Int): Unit = {
TestUtils.retry(60000) {
assertEquals(expected, cluster.controllers().values().iterator().next().
authorizer.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get())
authorizerPlugin.get.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get())
assertEquals(expected, cluster.brokers().values().iterator().next().
authorizer.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get())
authorizerPlugin.get.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get())
}
}

View File

@ -223,9 +223,9 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
result.exception.ifPresent { e => throw e }
}
val aclFilter = new AclBindingFilter(resource.toFilter, AccessControlEntryFilter.ANY)
(brokers.map(_.authorizer.get) ++ controllerServers.map(_.authorizer.get)).foreach {
(brokers.map(_.authorizerPlugin.get) ++ controllerServers.map(_.authorizerPlugin.get)).foreach {
authorizer => waitAndVerifyAcls(
authorizer.acls(aclFilter).asScala.map(_.entry).toSet ++ acls,
authorizer.get.acls(aclFilter).asScala.map(_.entry).toSet ++ acls,
authorizer, resource)
}
}
@ -239,9 +239,9 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
result.exception.ifPresent { e => throw e }
}
val aclFilter = new AclBindingFilter(resource.toFilter, AccessControlEntryFilter.ANY)
(brokers.map(_.authorizer.get) ++ controllerServers.map(_.authorizer.get)).foreach {
(brokers.map(_.authorizerPlugin.get) ++ controllerServers.map(_.authorizerPlugin.get)).foreach {
authorizer => waitAndVerifyAcls(
authorizer.acls(aclFilter).asScala.map(_.entry).toSet -- acls,
authorizer.get.acls(aclFilter).asScala.map(_.entry).toSet -- acls,
authorizer, resource)
}
}

View File

@ -23,6 +23,8 @@ import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.metrics.{Metrics, PluginMetrics}
import org.apache.kafka.common.metrics.internals.PluginMetricsImpl
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.resource.PatternType.{LITERAL, MATCH, PREFIXED}
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
@ -82,9 +84,9 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val props = properties
config = KafkaConfig.fromProps(props)
authorizer1 = createAuthorizer()
configureAuthorizer(authorizer1, config.originals)
configureAuthorizer(authorizer1, config.originals, new PluginMetricsImpl(new Metrics(), util.Map.of()))
authorizer2 = createAuthorizer()
configureAuthorizer(authorizer2, config.originals)
configureAuthorizer(authorizer2, config.originals, new PluginMetricsImpl(new Metrics(), util.Map.of()))
resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
}
@ -297,7 +299,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val cfg = KafkaConfig.fromProps(props)
val testAuthorizer = createAuthorizer()
try {
configureAuthorizer(testAuthorizer, cfg.originals)
configureAuthorizer(testAuthorizer, cfg.originals, new PluginMetricsImpl(new Metrics(), util.Map.of()))
assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
"when acls = null or [], authorizer should allow op with allow.everyone = true.")
} finally {
@ -315,7 +317,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val cfg = KafkaConfig.fromProps(props)
val testAuthorizer = createAuthorizer()
try {
configureAuthorizer(testAuthorizer, cfg.originals)
configureAuthorizer(testAuthorizer, cfg.originals, new PluginMetricsImpl(new Metrics(), util.Map.of()))
assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
"when acls = null or [], authorizer should allow op with allow.everyone = true.")
} finally {
@ -366,7 +368,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
//test remove all acls for resource
removeAcls(authorizer1, Set.empty, resource)
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1, resource)
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1, resource, AccessControlEntryFilter.ANY)
acls = changeAclAndVerify(Set.empty, Set(acl1), Set.empty)
changeAclAndVerify(acls, Set.empty, acls)
@ -385,7 +387,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
addAcls(authorizer1, Set(acl1), commonResource)
addAcls(authorizer1, Set(acl2), commonResource)
TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource)
TestUtils.waitAndVerifyAcls(Set(acl1, acl2), authorizer1, commonResource, AccessControlEntryFilter.ANY)
}
/**
@ -629,7 +631,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
val cfg = KafkaConfig.fromProps(props)
val authorizer: Authorizer = createAuthorizer()
try {
configureAuthorizer(authorizer, cfg.originals)
configureAuthorizer(authorizer, cfg.originals, new PluginMetricsImpl(new Metrics(), util.Map.of()))
assertTrue(authorizeByResourceType(authorizer, requestContext, READ, resource.resourceType()),
"If allow.everyone.if.no.acl.found = true, caller should have read access to at least one topic")
assertTrue(authorizeByResourceType(authorizer, requestContext, WRITE, resource.resourceType()),
@ -655,7 +657,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
acls --= removedAcls
}
TestUtils.waitAndVerifyAcls(acls, authorizer1, resource)
TestUtils.waitAndVerifyAcls(acls, authorizer1, resource, AccessControlEntryFilter.ANY)
acls
}
@ -689,13 +691,16 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
}
def configureAuthorizer(authorizer: Authorizer,
configs: util.Map[String, AnyRef]): Unit = {
configureStandardAuthorizer(authorizer.asInstanceOf[StandardAuthorizer], configs)
configs: util.Map[String, AnyRef],
pluginMetrics: PluginMetrics): Unit = {
configureStandardAuthorizer(authorizer.asInstanceOf[StandardAuthorizer], configs, pluginMetrics)
}
def configureStandardAuthorizer(standardAuthorizer: StandardAuthorizer,
configs: util.Map[String, AnyRef]): Unit = {
configs: util.Map[String, AnyRef],
pluginMetrics: PluginMetrics): Unit = {
standardAuthorizer.configure(configs)
standardAuthorizer.withPluginMetrics(pluginMetrics)
initializeStandardAuthorizer(standardAuthorizer, new AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)))
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.EndpointType
import java.net.InetAddress
import java.util
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.{DescribeClusterRequestData, DescribeClusterResponseData}
import org.apache.kafka.common.message.DescribeClusterResponseData.DescribeClusterBrokerCollection
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
@ -71,6 +72,7 @@ class AuthHelperTest {
@Test
def testAuthorize(): Unit = {
val authorizer: Authorizer = mock(classOf[Authorizer])
val authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
val operation = AclOperation.WRITE
val resourceType = ResourceType.TOPIC
@ -88,7 +90,7 @@ class AuthHelperTest {
when(authorizer.authorize(requestContext, expectedActions.asJava))
.thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
val result = new AuthHelper(Some(authorizer)).authorize(
val result = new AuthHelper(Some(authorizerPlugin)).authorize(
requestContext, operation, resourceType, resourceName)
verify(authorizer).authorize(requestContext, expectedActions.asJava)
@ -99,6 +101,7 @@ class AuthHelperTest {
@Test
def testFilterByAuthorized(): Unit = {
val authorizer: Authorizer = mock(classOf[Authorizer])
val authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
val operation = AclOperation.WRITE
val resourceType = ResourceType.TOPIC
@ -132,7 +135,7 @@ class AuthHelperTest {
}.asJava
}
val result = new AuthHelper(Some(authorizer)).filterByAuthorized(
val result = new AuthHelper(Some(authorizerPlugin)).filterByAuthorized(
requestContext,
operation,
resourceType,
@ -149,7 +152,9 @@ class AuthHelperTest {
@Test
def testComputeDescribeClusterResponseV1WithUnknownEndpointType(): Unit = {
val authHelper = new AuthHelper(Some(mock(classOf[Authorizer])))
val authorizer: Authorizer = mock(classOf[Authorizer])
val authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
val authHelper = new AuthHelper(Some(authorizerPlugin))
val request = newMockDescribeClusterRequest(
new DescribeClusterRequestData().setEndpointType(123.toByte), 1)
val responseData = authHelper.computeDescribeClusterResponse(request,
@ -164,7 +169,9 @@ class AuthHelperTest {
@Test
def testComputeDescribeClusterResponseV0WithUnknownEndpointType(): Unit = {
val authHelper = new AuthHelper(Some(mock(classOf[Authorizer])))
val authorizer: Authorizer = mock(classOf[Authorizer])
val authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
val authHelper = new AuthHelper(Some(authorizerPlugin))
val request = newMockDescribeClusterRequest(
new DescribeClusterRequestData().setEndpointType(123.toByte), 0)
val responseData = authHelper.computeDescribeClusterResponse(request,
@ -179,7 +186,9 @@ class AuthHelperTest {
@Test
def testComputeDescribeClusterResponseV1WithUnexpectedEndpointType(): Unit = {
val authHelper = new AuthHelper(Some(mock(classOf[Authorizer])))
val authorizer: Authorizer = mock(classOf[Authorizer])
val authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
val authHelper = new AuthHelper(Some(authorizerPlugin))
val request = newMockDescribeClusterRequest(
new DescribeClusterRequestData().setEndpointType(EndpointType.BROKER.id()), 1)
val responseData = authHelper.computeDescribeClusterResponse(request,
@ -194,7 +203,9 @@ class AuthHelperTest {
@Test
def testComputeDescribeClusterResponseV0WithUnexpectedEndpointType(): Unit = {
val authHelper = new AuthHelper(Some(mock(classOf[Authorizer])))
val authorizer: Authorizer = mock(classOf[Authorizer])
val authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
val authHelper = new AuthHelper(Some(authorizerPlugin))
val request = newMockDescribeClusterRequest(
new DescribeClusterRequestData().setEndpointType(EndpointType.BROKER.id()), 0)
val responseData = authHelper.computeDescribeClusterResponse(request,
@ -209,7 +220,9 @@ class AuthHelperTest {
@Test
def testComputeDescribeClusterResponseWhereControllerIsNotFound(): Unit = {
val authHelper = new AuthHelper(Some(mock(classOf[Authorizer])))
val authorizer: Authorizer = mock(classOf[Authorizer])
val authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
val authHelper = new AuthHelper(Some(authorizerPlugin))
val request = newMockDescribeClusterRequest(
new DescribeClusterRequestData().setEndpointType(EndpointType.CONTROLLER.id()), 1)
val responseData = authHelper.computeDescribeClusterResponse(request,
@ -226,7 +239,9 @@ class AuthHelperTest {
@Test
def testComputeDescribeClusterResponseSuccess(): Unit = {
val authHelper = new AuthHelper(Some(mock(classOf[Authorizer])))
val authorizer: Authorizer = mock(classOf[Authorizer])
val authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
val authHelper = new AuthHelper(Some(authorizerPlugin))
val request = newMockDescribeClusterRequest(
new DescribeClusterRequestData().setEndpointType(EndpointType.CONTROLLER.id()), 1)
val nodes = new DescribeClusterBrokerCollection(

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.{Plugin, Topic}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => OldAlterConfigsResource, AlterConfigsResourceCollection => OldAlterConfigsResourceCollection, AlterableConfig => OldAlterableConfig, AlterableConfigCollection => OldAlterableConfigCollection}
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
@ -151,7 +151,7 @@ class ControllerApisTest {
private var controllerApis: ControllerApis = _
private def createControllerApis(authorizer: Option[Authorizer],
private def createControllerApis(authorizer: Option[Plugin[Authorizer]],
controller: Controller,
props: Properties = new Properties(),
throttle: Boolean = false): ControllerApis = {
@ -200,7 +200,7 @@ class ControllerApisTest {
requestChannelMetrics)
}
def createDenyAllAuthorizer(): Authorizer = {
def createDenyAllAuthorizer(): Plugin[Authorizer] = {
val authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(
any(classOf[AuthorizableRequestContext]),
@ -208,7 +208,7 @@ class ControllerApisTest {
)).thenReturn(
singletonList(AuthorizationResult.DENIED)
)
authorizer
Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
}
@Test
@ -1009,7 +1009,8 @@ class ControllerApisTest {
.newInitialTopic("foo", Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"))
.build()
val authorizer = mock(classOf[Authorizer])
controllerApis = createControllerApis(Some(authorizer), controller)
val authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
controllerApis = createControllerApis(Some(authorizerPlugin), controller)
val requestData = new CreatePartitionsRequestData()
requestData.topics().add(new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2))
requestData.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10))
@ -1069,8 +1070,9 @@ class ControllerApisTest {
@Test
def testElectLeadersAuthorization(): Unit = {
val authorizer = mock(classOf[Authorizer])
val authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
val controller = mock(classOf[Controller])
controllerApis = createControllerApis(Some(authorizer), controller)
controllerApis = createControllerApis(Some(authorizerPlugin), controller)
val request = new ElectLeadersRequest.Builder(
ElectionType.PREFERRED,
null,
@ -1213,7 +1215,8 @@ class ControllerApisTest {
def testAssignReplicasToDirs(): Unit = {
val controller = mock(classOf[Controller])
val authorizer = mock(classOf[Authorizer])
controllerApis = createControllerApis(Some(authorizer), controller)
val authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
controllerApis = createControllerApis(Some(authorizerPlugin), controller)
val request = new AssignReplicasToDirsRequest.Builder(new AssignReplicasToDirsRequestData()).build()
when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(Collections.singletonList(new Action(

View File

@ -28,6 +28,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
@ -480,7 +481,8 @@ class DynamicBrokerConfigTest {
when(kafkaServer.logManager).thenReturn(logManager)
val authorizer = new TestAuthorizer
when(kafkaServer.authorizer).thenReturn(Some(authorizer))
val authorizerPlugin: Plugin[Authorizer] = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
when(kafkaServer.authorizerPlugin).thenReturn(Some(authorizerPlugin))
kafkaServer.config.dynamicConfig.addReconfigurables(kafkaServer)
props.put("super.users", "User:admin")
@ -522,7 +524,8 @@ class DynamicBrokerConfigTest {
when(controllerServer.socketServer).thenReturn(socketServer)
val authorizer = new TestAuthorizer
when(controllerServer.authorizer).thenReturn(Some(authorizer))
val authorizerPlugin: Plugin[Authorizer] = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
when(controllerServer.authorizerPlugin).thenReturn(Some(authorizerPlugin))
controllerServer.config.dynamicConfig.addReconfigurables(controllerServer)
props.put("super.users", "User:admin")
@ -567,7 +570,8 @@ class DynamicBrokerConfigTest {
when(controllerServer.socketServer).thenReturn(socketServer)
val authorizer = new TestAuthorizer
when(controllerServer.authorizer).thenReturn(Some(authorizer))
val authorizerPlugin: Plugin[Authorizer] = Plugin.wrapInstance(authorizer, null, "authorizer.class.name")
when(controllerServer.authorizerPlugin).thenReturn(Some(authorizerPlugin))
controllerServer.config.dynamicConfig.addReconfigurables(controllerServer)
props.put("super.users", "User:admin")

View File

@ -32,7 +32,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.{Plugin, Topic}
import org.apache.kafka.common.internals.Topic.SHARE_GROUP_STATE_TOPIC_NAME
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection}
@ -199,7 +199,7 @@ class KafkaApisTest extends Logging {
configRepository = configRepository,
metadataCache = metadataCache,
metrics = metrics,
authorizer = authorizer,
authorizerPlugin = authorizer.map(Plugin.wrapInstance(_, null, "authorizer.class.name")),
quotas = quotas,
fetchManager = fetchManager,
sharePartitionManager = sharePartitionManager,

View File

@ -33,7 +33,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.errors.{OperationNotAttemptedException, TopicExistsException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.{Plugin, Topic}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ConnectionMode, ListenerName}
@ -1122,11 +1122,17 @@ object TestUtils extends Logging {
}
def waitAndVerifyAcls(expected: Set[AccessControlEntry],
authorizer: JAuthorizer,
authorizerPlugin: Plugin[JAuthorizer],
resource: ResourcePattern,
accessControlEntryFilter: AccessControlEntryFilter = AccessControlEntryFilter.ANY): Unit = {
val newLine = scala.util.Properties.lineSeparator
waitAndVerifyAcls(expected, authorizerPlugin.get, resource, accessControlEntryFilter)
}
def waitAndVerifyAcls(expected: Set[AccessControlEntry],
authorizer: JAuthorizer,
resource: ResourcePattern,
accessControlEntryFilter: AccessControlEntryFilter): Unit = {
val newLine = scala.util.Properties.lineSeparator
val filter = new AclBindingFilter(resource.toFilter, accessControlEntryFilter)
waitUntilTrue(() => authorizer.acls(filter).asScala.map(_.entry).toSet == expected,
s"expected acls:${expected.mkString(newLine + "\t", newLine + "\t", newLine)}" +
@ -1431,12 +1437,12 @@ object TestUtils extends Logging {
controllers: Seq[ControllerServer],
): JAuthorizer = {
if (controllers.isEmpty) {
brokers.head.authorizer.get
brokers.head.authorizerPlugin.get.get
} else {
var result: JAuthorizer = null
TestUtils.retry(120000) {
val active = controllers.filter(_.controller.isActive).head
result = active.authorizer.get
result = active.authorizerPlugin.get.get
}
result
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
@ -144,7 +145,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
private GroupCoordinatorMetrics groupCoordinatorMetrics;
private GroupConfigManager groupConfigManager;
private Persister persister;
private Optional<Authorizer> authorizer;
private Optional<Plugin<Authorizer>> authorizerPlugin;
public Builder(
int nodeId,
@ -194,8 +195,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
return this;
}
public Builder withAuthorizer(Optional<Authorizer> authorizer) {
this.authorizer = authorizer;
public Builder withAuthorizerPlugin(Optional<Plugin<Authorizer>> authorizerPlugin) {
this.authorizerPlugin = authorizerPlugin;
return this;
}
@ -209,14 +210,14 @@ public class GroupCoordinatorService implements GroupCoordinator {
requireNonNull(groupCoordinatorMetrics, new IllegalArgumentException("GroupCoordinatorMetrics must be set."));
requireNonNull(groupConfigManager, new IllegalArgumentException("GroupConfigManager must be set."));
requireNonNull(persister, new IllegalArgumentException("Persister must be set."));
requireNonNull(authorizer, new IllegalArgumentException("Authorizer must be set."));
requireNonNull(authorizerPlugin, new IllegalArgumentException("Authorizer must be set."));
String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
CoordinatorShardBuilderSupplier<GroupCoordinatorShard, CoordinatorRecord> supplier = () ->
new GroupCoordinatorShard.Builder(config, groupConfigManager)
.withAuthorizer(authorizer);
.withAuthorizerPlugin(authorizerPlugin);
CoordinatorEventProcessor processor = new MultiThreadedEventProcessor(
logContext,

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@ -163,7 +164,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
private CoordinatorExecutor<CoordinatorRecord> executor;
private CoordinatorMetrics coordinatorMetrics;
private TopicPartition topicPartition;
private Optional<Authorizer> authorizer;
private Optional<Plugin<Authorizer>> authorizerPlugin;
public Builder(
GroupCoordinatorConfig config,
@ -227,10 +228,10 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return this;
}
public CoordinatorShardBuilder<GroupCoordinatorShard, CoordinatorRecord> withAuthorizer(
Optional<Authorizer> authorizer
public CoordinatorShardBuilder<GroupCoordinatorShard, CoordinatorRecord> withAuthorizerPlugin(
Optional<Plugin<Authorizer>> authorizerPlugin
) {
this.authorizer = authorizer;
this.authorizerPlugin = authorizerPlugin;
return this;
}
@ -254,7 +255,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
throw new IllegalArgumentException("TopicPartition must be set.");
if (groupConfigManager == null)
throw new IllegalArgumentException("GroupConfigManager must be set.");
if (authorizer == null)
if (authorizerPlugin == null)
throw new IllegalArgumentException("Authorizer must be set.");
GroupCoordinatorMetricsShard metricsShard = ((GroupCoordinatorMetrics) coordinatorMetrics)
@ -269,7 +270,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.withConfig(config)
.withGroupConfigManager(groupConfigManager)
.withGroupCoordinatorMetricsShard(metricsShard)
.withAuthorizer(authorizer)
.withAuthorizerPlugin(authorizerPlugin)
.build();
OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder()

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@ -301,7 +302,7 @@ public class GroupMetadataManager {
private MetadataImage metadataImage = null;
private ShareGroupPartitionAssignor shareGroupAssignor = null;
private GroupCoordinatorMetricsShard metrics;
private Optional<Authorizer> authorizer = null;
private Optional<Plugin<Authorizer>> authorizerPlugin = null;
private List<TaskAssignor> streamsGroupAssignors = null;
Builder withLogContext(LogContext logContext) {
@ -359,8 +360,8 @@ public class GroupMetadataManager {
return this;
}
Builder withAuthorizer(Optional<Authorizer> authorizer) {
this.authorizer = authorizer;
Builder withAuthorizerPlugin(Optional<Plugin<Authorizer>> authorizerPlugin) {
this.authorizerPlugin = authorizerPlugin;
return this;
}
@ -369,7 +370,7 @@ public class GroupMetadataManager {
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (time == null) time = Time.SYSTEM;
if (authorizer == null) authorizer = Optional.empty();
if (authorizerPlugin == null) authorizerPlugin = Optional.empty();
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
@ -397,7 +398,7 @@ public class GroupMetadataManager {
config,
groupConfigManager,
shareGroupAssignor,
authorizer,
authorizerPlugin,
streamsGroupAssignors
);
}
@ -526,7 +527,7 @@ public class GroupMetadataManager {
/**
* The authorizer to validate the regex subscription topics.
*/
private final Optional<Authorizer> authorizer;
private final Optional<Plugin<Authorizer>> authorizerPlugin;
private GroupMetadataManager(
SnapshotRegistry snapshotRegistry,
@ -539,7 +540,7 @@ public class GroupMetadataManager {
GroupCoordinatorConfig config,
GroupConfigManager groupConfigManager,
ShareGroupPartitionAssignor shareGroupAssignor,
Optional<Authorizer> authorizer,
Optional<Plugin<Authorizer>> authorizerPlugin,
List<TaskAssignor> streamsGroupAssignors
) {
this.logContext = logContext;
@ -561,7 +562,7 @@ public class GroupMetadataManager {
this.shareGroupPartitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.groupConfigManager = groupConfigManager;
this.shareGroupAssignor = shareGroupAssignor;
this.authorizer = authorizer;
this.authorizerPlugin = authorizerPlugin;
this.streamsGroupAssignors = streamsGroupAssignors.stream().collect(Collectors.toMap(TaskAssignor::name, Function.identity()));
}
@ -3313,7 +3314,7 @@ public class GroupMetadataManager {
Set<String> regexes = Collections.unmodifiableSet(subscribedRegularExpressions.keySet());
executor.schedule(
key,
() -> refreshRegularExpressions(context, groupId, log, time, metadataImage, authorizer, regexes),
() -> refreshRegularExpressions(context, groupId, log, time, metadataImage, authorizerPlugin, regexes),
(result, exception) -> handleRegularExpressionsResult(groupId, memberId, result, exception)
);
}
@ -3331,7 +3332,7 @@ public class GroupMetadataManager {
* @param log The log instance.
* @param time The time instance.
* @param image The metadata image to use for listing the topics.
* @param authorizer The authorizer.
* @param authorizerPlugin The authorizer.
* @param regexes The list of regular expressions that must be resolved.
* @return The list of resolved regular expressions.
*
@ -3343,7 +3344,7 @@ public class GroupMetadataManager {
Logger log,
Time time,
MetadataImage image,
Optional<Authorizer> authorizer,
Optional<Plugin<Authorizer>> authorizerPlugin,
Set<String> regexes
) {
long startTimeMs = time.milliseconds();
@ -3374,7 +3375,7 @@ public class GroupMetadataManager {
filterTopicDescribeAuthorizedTopics(
context,
authorizer,
authorizerPlugin,
resolvedRegexes
);
@ -3399,15 +3400,15 @@ public class GroupMetadataManager {
* that the member is authorized to describe.
*
* @param context The request context.
* @param authorizer The authorizer.
* @param authorizerPlugin The authorizer.
* @param resolvedRegexes The map of the regex pattern and its set of matched topics.
*/
private static void filterTopicDescribeAuthorizedTopics(
RequestContext context,
Optional<Authorizer> authorizer,
Optional<Plugin<Authorizer>> authorizerPlugin,
Map<String, Set<String>> resolvedRegexes
) {
if (authorizer.isEmpty()) return;
if (authorizerPlugin.isEmpty()) return;
Map<String, Integer> topicNameCount = new HashMap<>();
resolvedRegexes.values().forEach(topicNames ->
@ -3421,7 +3422,7 @@ public class GroupMetadataManager {
return new Action(DESCRIBE, resource, entry.getValue(), true, false);
}).collect(Collectors.toList());
List<AuthorizationResult> authorizationResults = authorizer.get().authorize(context, actions);
List<AuthorizationResult> authorizationResults = authorizerPlugin.get().get().authorize(context, actions);
Set<String> deniedTopics = new HashSet<>();
IntStream.range(0, actions.size()).forEach(i -> {
if (authorizationResults.get(i) == AuthorizationResult.DENIED) {

View File

@ -37,6 +37,7 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@ -20267,13 +20268,14 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
Authorizer authorizer = mock(Authorizer.class);
Plugin<Authorizer> authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build(12345L))
.withAuthorizer(authorizer)
.withAuthorizerPlugin(authorizerPlugin)
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@ -472,7 +473,7 @@ public class GroupMetadataManagerTestContext {
private ShareGroupPartitionAssignor shareGroupAssignor = new MockPartitionAssignor("share");
private final List<ShareGroupBuilder> shareGroupBuilders = new ArrayList<>();
private final Map<String, Object> config = new HashMap<>();
private Optional<Authorizer> authorizer = Optional.empty();
private Optional<Plugin<Authorizer>> authorizerPlugin = Optional.empty();
private List<TaskAssignor> streamsGroupAssignors = Collections.singletonList(new MockTaskAssignor("mock"));
public Builder withConfig(String key, Object value) {
@ -505,8 +506,8 @@ public class GroupMetadataManagerTestContext {
return this;
}
public Builder withAuthorizer(Authorizer authorizer) {
this.authorizer = Optional.of(authorizer);
public Builder withAuthorizerPlugin(Plugin<Authorizer> authorizerPlugin) {
this.authorizerPlugin = Optional.of(authorizerPlugin);
return this;
}
@ -544,7 +545,7 @@ public class GroupMetadataManagerTestContext {
.withGroupCoordinatorMetricsShard(metrics)
.withShareGroupAssignor(shareGroupAssignor)
.withGroupConfigManager(groupConfigManager)
.withAuthorizer(authorizer)
.withAuthorizerPlugin(authorizerPlugin)
.withStreamsGroupAssignors(streamsGroupAssignors)
.build(),
groupConfigManager

View File

@ -195,7 +195,7 @@ public class KRaftMetadataRequestBenchmark {
setConfigRepository(new MockConfigRepository()).
setMetadataCache(metadataCache).
setMetrics(metrics).
setAuthorizer(Optional.empty()).
setAuthorizerPlugin(Optional.empty()).
setQuotas(quotaManagers).
setFetchManager(fetchManager).
setSharePartitionManager(sharePartitionManager).

View File

@ -23,6 +23,12 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Monitorable;
import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
@ -38,16 +44,16 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
/**
* The standard authorizer which is used in KRaft-based clusters if no other authorizer is
* configured.
* Built-in authorizer implementation that stores ACLs in the metadata log.
*/
public class StandardAuthorizer implements ClusterMetadataAuthorizer {
public class StandardAuthorizer implements ClusterMetadataAuthorizer, Monitorable {
public static final String SUPER_USERS_CONFIG = "super.users";
public static final String ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG = "allow.everyone.if.no.acl.found";
@ -64,6 +70,8 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
*/
private volatile StandardAuthorizerData data = StandardAuthorizerData.createEmpty();
private AuthorizerMetrics authorizerMetrics;
@Override
public void setAclMutator(AclMutator aclMutator) {
this.data = data.copyWithNewAclMutator(aclMutator);
@ -141,6 +149,7 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
for (Action action : actions) {
AuthorizationResult result = curData.authorize(requestContext, action);
results.add(result);
authorizerMetrics.recordAuthorizerMetrics(result);
}
return results;
}
@ -207,4 +216,45 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
if (configValue == null) return DENIED;
return Boolean.parseBoolean(configValue.toString().trim()) ? ALLOWED : DENIED;
}
@Override
public void withPluginMetrics(PluginMetrics metrics) {
this.authorizerMetrics = new AuthorizerMetrics(metrics);
}
private class AuthorizerMetrics {
private final Sensor authorizationAllowedSensor;
private final Sensor authorizationDeniedSensor;
private final Sensor authorizationRequestSensor;
private AuthorizerMetrics(PluginMetrics metrics) {
authorizationAllowedSensor = metrics.addSensor("authorizer-authorization-allowed");
authorizationAllowedSensor.add(
metrics.metricName("authorization-allowed-rate-per-minute", "The number of authorization allowed per minute", Map.of()),
new Rate(TimeUnit.MINUTES, new WindowedCount()));
authorizationDeniedSensor = metrics.addSensor("authorizer-authorization-denied");
authorizationDeniedSensor.add(
metrics.metricName("authorization-denied-rate-per-minute", "The number of authorization denied per minute", Map.of()),
new Rate(TimeUnit.MINUTES, new WindowedCount()));
authorizationRequestSensor = metrics.addSensor("authorizer-authorization-request");
authorizationRequestSensor.add(
metrics.metricName("authorization-request-rate-per-minute", "The number of authorization request per minute", Map.of()),
new Rate(TimeUnit.MINUTES, new WindowedCount()));
metrics.addMetric(
metrics.metricName("acls-total-count", "The number of acls defined", Map.of()),
(Gauge<Integer>) (config, now) -> aclCount());
}
private void recordAuthorizerMetrics(AuthorizationResult authorizationResult) {
if (authorizationResult == ALLOWED) {
authorizationAllowedSensor.record();
} else {
authorizationDeniedSensor.record();
}
authorizationRequestSensor.record();
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.metadata.publisher;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
@ -36,14 +37,14 @@ public class AclPublisher implements MetadataPublisher {
private final int nodeId;
private final FaultHandler faultHandler;
private final String nodeType;
private final Optional<ClusterMetadataAuthorizer> authorizer;
private final Optional<Plugin<Authorizer>> authorizer;
private boolean completedInitialLoad = false;
public AclPublisher(int nodeId, FaultHandler faultHandler, String nodeType, Optional<Authorizer> authorizer) {
public AclPublisher(int nodeId, FaultHandler faultHandler, String nodeType, Optional<Plugin<Authorizer>> authorizer) {
this.nodeId = nodeId;
this.faultHandler = faultHandler;
this.nodeType = nodeType;
this.authorizer = authorizer.filter(ClusterMetadataAuthorizer.class::isInstance).map(ClusterMetadataAuthorizer.class::cast);
this.authorizer = authorizer.filter(plugin -> plugin.get() instanceof ClusterMetadataAuthorizer);
this.log = new LogContext(name()).logger(AclPublisher.class);
}
@ -63,7 +64,8 @@ public class AclPublisher implements MetadataPublisher {
// we want to apply those changes in that order, not the reverse order! Otherwise
// there could be a window during which incorrect authorization results are returned.
Optional.ofNullable(delta.aclsDelta()).ifPresent(aclsDelta -> {
authorizer.ifPresent(clusterMetadataAuthorizer -> {
authorizer.ifPresent(authorizer -> {
ClusterMetadataAuthorizer clusterMetadataAuthorizer = (ClusterMetadataAuthorizer) authorizer.get();
if (manifest.type().equals(LoaderManifestType.SNAPSHOT)) {
try {
// If the delta resulted from a snapshot load, we want to apply the new changes
@ -103,6 +105,9 @@ public class AclPublisher implements MetadataPublisher {
@Override
public void close() {
authorizer.ifPresent(clusterMetadataAuthorizer -> clusterMetadataAuthorizer.completeInitialLoad(new TimeoutException()));
authorizer.ifPresent(authorizer -> {
ClusterMetadataAuthorizer clusterMetadataAuthorizer = (ClusterMetadataAuthorizer) authorizer.get();
clusterMetadataAuthorizer.completeInitialLoad(new TimeoutException());
});
}
}

View File

@ -27,6 +27,8 @@ import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.AuthorizerNotReadyException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
@ -49,8 +51,6 @@ import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -101,12 +101,9 @@ public class StandardAuthorizerTest {
"127.0.0.1",
9020);
public static class AuthorizerTestServerInfo implements AuthorizerServerInfo {
private final Collection<Endpoint> endpoints;
public AuthorizerTestServerInfo(Collection<Endpoint> endpoints) {
public record AuthorizerTestServerInfo(Collection<Endpoint> endpoints) implements AuthorizerServerInfo {
public AuthorizerTestServerInfo {
assertFalse(endpoints.isEmpty());
this.endpoints = endpoints;
}
@Override
@ -119,11 +116,6 @@ public class StandardAuthorizerTest {
return 0;
}
@Override
public Collection<Endpoint> endpoints() {
return endpoints;
}
@Override
public Endpoint interBrokerEndpoint() {
return endpoints.iterator().next();
@ -141,15 +133,17 @@ public class StandardAuthorizerTest {
}
}
private final Metrics metrics = new Metrics();
@Test
public void testGetConfiguredSuperUsers() {
assertEquals(Set.of(),
getConfiguredSuperUsers(Map.of()));
assertEquals(Set.of(),
getConfiguredSuperUsers(Map.of(SUPER_USERS_CONFIG, " ")));
assertEquals(new HashSet<>(List.of("User:bob", "User:alice")),
assertEquals(Set.of("User:bob", "User:alice"),
getConfiguredSuperUsers(Map.of(SUPER_USERS_CONFIG, "User:bob;User:alice ")));
assertEquals(new HashSet<>(List.of("User:bob", "User:alice")),
assertEquals(Set.of("User:bob", "User:alice"),
getConfiguredSuperUsers(Map.of(SUPER_USERS_CONFIG, "; User:bob ; User:alice ")));
assertEquals("expected a string in format principalType:principalName but got bob",
assertThrows(IllegalArgumentException.class, () -> getConfiguredSuperUsers(
@ -167,13 +161,10 @@ public class StandardAuthorizerTest {
@Test
public void testAllowEveryoneIfNoAclFoundConfigEnabled() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
HashMap<String, Object> configs = new HashMap<>();
configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
authorizer.configure(configs);
authorizer.start(new AuthorizerTestServerInfo(List.of(PLAINTEXT)));
authorizer.completeInitialLoad();
Map<String, Object> configs = Map.of(
SUPER_USERS_CONFIG, "User:alice;User:chris",
ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer(configs);
List<StandardAclWithId> acls = List.of(
withId(new StandardAcl(TOPIC, "topic1", LITERAL, "User:Alice", WILDCARD, READ, ALLOW))
@ -197,13 +188,10 @@ public class StandardAuthorizerTest {
@Test
public void testAllowEveryoneIfNoAclFoundConfigDisabled() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
HashMap<String, Object> configs = new HashMap<>();
configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false");
authorizer.configure(configs);
authorizer.start(new AuthorizerTestServerInfo(List.of(PLAINTEXT)));
authorizer.completeInitialLoad();
Map<String, Object> configs = Map.of(
SUPER_USERS_CONFIG, "User:alice;User:chris",
ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false");
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer(configs);
List<StandardAclWithId> acls = List.of(
withId(new StandardAcl(TOPIC, "topic1", LITERAL, "User:Alice", WILDCARD, READ, ALLOW))
@ -227,31 +215,35 @@ public class StandardAuthorizerTest {
@Test
public void testConfigure() {
StandardAuthorizer authorizer = new StandardAuthorizer();
HashMap<String, Object> configs = new HashMap<>();
configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
authorizer.configure(configs);
assertEquals(new HashSet<>(List.of("User:alice", "User:chris")), authorizer.superUsers());
Map<String, Object> configs = Map.of(
SUPER_USERS_CONFIG, "User:alice;User:chris",
ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer(configs);
assertEquals(Set.of("User:alice", "User:chris"), authorizer.superUsers());
assertEquals(ALLOWED, authorizer.defaultResult());
}
static Action newAction(AclOperation aclOperation,
private static Action newAction(AclOperation aclOperation,
ResourceType resourceType,
String resourceName) {
return new Action(aclOperation,
new ResourcePattern(resourceType, resourceName, LITERAL), 1, false, false);
}
static StandardAuthorizer createAndInitializeStandardAuthorizer() {
private StandardAuthorizer createAndInitializeStandardAuthorizer() {
return createAndInitializeStandardAuthorizer(Map.of(SUPER_USERS_CONFIG, "User:superman"));
}
private StandardAuthorizer createAndInitializeStandardAuthorizer(Map<String, Object> configs) {
StandardAuthorizer authorizer = new StandardAuthorizer();
authorizer.configure(Map.of(SUPER_USERS_CONFIG, "User:superman"));
authorizer.configure(configs);
authorizer.withPluginMetrics(new PluginMetricsImpl(metrics, Map.of()));
authorizer.start(new AuthorizerTestServerInfo(List.of(PLAINTEXT)));
authorizer.completeInitialLoad();
return authorizer;
}
static StandardAcl newFooAcl(AclOperation op, AclPermissionType permission) {
private static StandardAcl newFooAcl(AclOperation op, AclPermissionType permission) {
return new StandardAcl(
TOPIC,
"foo_",
@ -262,7 +254,7 @@ public class StandardAuthorizerTest {
permission);
}
static StandardAclWithId withId(StandardAcl acl) {
private static StandardAclWithId withId(StandardAcl acl) {
return new StandardAclWithId(new Uuid(acl.hashCode(), acl.hashCode()), acl);
}
@ -311,7 +303,7 @@ public class StandardAuthorizerTest {
newFooAcl(ALTER_CONFIGS, DENY)));
}
static StandardAcl newBarAcl(AclOperation op, AclPermissionType permission) {
private static StandardAcl newBarAcl(AclOperation op, AclPermissionType permission) {
return new StandardAcl(
GROUP,
"bar",
@ -641,7 +633,7 @@ public class StandardAuthorizerTest {
authorizer.configure(Map.of(SUPER_USERS_CONFIG, "User:superman"));
Map<Endpoint, ? extends CompletionStage<Void>> futures2 = authorizer.
start(new AuthorizerTestServerInfo(List.of(PLAINTEXT, CONTROLLER)));
assertEquals(new HashSet<>(List.of(PLAINTEXT, CONTROLLER)), futures2.keySet());
assertEquals(Set.of(PLAINTEXT, CONTROLLER), futures2.keySet());
assertFalse(futures2.get(PLAINTEXT).toCompletableFuture().isDone());
assertTrue(futures2.get(CONTROLLER).toCompletableFuture().isDone());
}
@ -656,6 +648,7 @@ public class StandardAuthorizerTest {
public void testAuthorizationPriorToCompleteInitialLoad() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
authorizer.configure(Map.of(SUPER_USERS_CONFIG, "User:superman"));
authorizer.withPluginMetrics(new PluginMetricsImpl(new Metrics(), Map.of()));
assertThrows(AuthorizerNotReadyException.class, () ->
authorizer.authorize(new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
@ -687,7 +680,7 @@ public class StandardAuthorizerTest {
authorizer.configure(Map.of(SUPER_USERS_CONFIG, "User:superman"));
Map<Endpoint, ? extends CompletionStage<Void>> futures = authorizer.
start(new AuthorizerTestServerInfo(List.of(PLAINTEXT, CONTROLLER)));
assertEquals(new HashSet<>(List.of(PLAINTEXT, CONTROLLER)), futures.keySet());
assertEquals(Set.of(PLAINTEXT, CONTROLLER), futures.keySet());
assertFalse(futures.get(PLAINTEXT).toCompletableFuture().isDone());
assertTrue(futures.get(CONTROLLER).toCompletableFuture().isDone());
authorizer.completeInitialLoad(new TimeoutException("timed out"));
@ -723,4 +716,17 @@ public class StandardAuthorizerTest {
newAction(WRITE, TOPIC, "bar"),
newAction(READ, TOPIC, "baz"))));
}
@Test
public void testAuthorizerMetrics() throws Exception {
// There's always 1 metrics by default, the metrics count
assertEquals(1, metrics.metrics().size());
StandardAuthorizer authorizer = createAndInitializeStandardAuthorizer();
assertEquals(List.of(ALLOWED), authorizer.authorize(
new MockAuthorizableRequestContext.Builder().setPrincipal(new KafkaPrincipal(USER_TYPE, "superman")).build(),
List.of(newAction(READ, TOPIC, "green"))));
// StandardAuthorizer has 4 metrics
assertEquals(5, metrics.metrics().size());
}
}

View File

@ -18,6 +18,7 @@
package org.apache.kafka.server.network;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
@ -79,17 +80,17 @@ public class EndpointReadyFutures {
/**
* Build the EndpointReadyFutures object.
*
* @param authorizer The authorizer to use, if any. Will be started.
* @param authorizerPlugin The authorizer to use, if any. Will be started.
* @param info Server information to be passed to the authorizer.
*
* @return The new futures object.
*/
public EndpointReadyFutures build(
Optional<Authorizer> authorizer,
Optional<Plugin<Authorizer>> authorizerPlugin,
AuthorizerServerInfo info
) {
if (authorizer.isPresent()) {
return build(authorizer.get().start(info), info);
if (authorizerPlugin.isPresent()) {
return build(authorizerPlugin.get().get().start(info), info);
} else {
return build(Map.of(), info);
}

View File

@ -16,13 +16,19 @@
*/
package org.apache.kafka.security.authorizer;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.authorizer.Authorizer;
import java.util.Map;
public class AuthorizerUtils {
public static Authorizer createAuthorizer(String className) throws ClassNotFoundException {
return Utils.newInstance(className, Authorizer.class);
public static Plugin<Authorizer> createAuthorizer(String className, Map<String, Object> configs, Metrics metrics, String key, String role) throws ClassNotFoundException {
Authorizer authorizer = Utils.newInstance(className, Authorizer.class);
authorizer.configure(configs);
return Plugin.wrapInstance(authorizer, metrics, key, "role", role);
}
public static boolean isClusterResource(String name) {

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.security.authorizer;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Monitorable;
import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.server.ProcessRole;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.config.ServerConfigs;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AuthorizerUtilsTest {
@ParameterizedTest
@EnumSource(ProcessRole.class)
public void testCreateAuthorizer(ProcessRole role) throws ClassNotFoundException {
Map<String, Object> configs = Map.of();
Metrics metrics = new Metrics();
assertEquals(1, metrics.metrics().size());
Plugin<Authorizer> authorizer = AuthorizerUtils.createAuthorizer(
MonitorableAuthorizer.class.getName(),
configs,
metrics,
ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG,
role.toString());
assertInstanceOf(MonitorableAuthorizer.class, authorizer.get());
MonitorableAuthorizer monitorableAuthorizer = (MonitorableAuthorizer) authorizer.get();
assertTrue(monitorableAuthorizer.configured);
assertNotNull(monitorableAuthorizer.metricName);
MetricName metricName = null;
for (MetricName name : metrics.metrics().keySet()) {
if (name.name().equals(monitorableAuthorizer.metricName.name())) {
metricName = name;
}
}
assertNotNull(metricName);
assertEquals(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, metricName.tags().get("config"));
assertEquals(MonitorableAuthorizer.class.getSimpleName(), metricName.tags().get("class"));
assertEquals(role.toString(), metricName.tags().get("role"));
assertTrue(metricName.tags().entrySet().containsAll(monitorableAuthorizer.extraTags.entrySet()));
assertEquals(0, metrics.metric(metricName).metricValue());
monitorableAuthorizer.authorize(null, null);
assertEquals(1, metrics.metric(metricName).metricValue());
}
public static class MonitorableAuthorizer implements Authorizer, Monitorable {
private final Map<String, String> extraTags = Map.of("k1", "v1");
private final AtomicInteger counter = new AtomicInteger();
private boolean configured = false;
private MetricName metricName = null;
@Override
public void withPluginMetrics(PluginMetrics metrics) {
assertTrue(configured);
metricName = metrics.metricName("authorize calls", "Number of times authorize was called", extraTags);
metrics.addMetric(metricName, (Gauge<Integer>) (config, now) -> counter.get());
}
@Override
public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) {
return Map.of();
}
@Override
public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
counter.incrementAndGet();
return List.of();
}
@Override
public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
return List.of();
}
@Override
public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
return List.of();
}
@Override
public Iterable<AclBinding> acls(AclBindingFilter filter) {
return null;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
configured = true;
}
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Monitorable;
import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.replica.RackAwareReplicaSelector;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG;
import static org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class MonitorablePluginsIntegrationTest {
private static int controllerId(Type type) {
return type == Type.KRAFT ? 3000 : 0;
}
private static Map<String, String> expectedTags(String config, String clazz) {
return expectedTags(config, clazz, Map.of());
}
private static Map<String, String> expectedTags(String config, String clazz, Map<String, String> extraTags) {
Map<String, String> tags = new LinkedHashMap<>();
tags.put("config", config);
tags.put("class", clazz);
tags.putAll(extraTags);
return tags;
}
@ClusterTest(
types = {Type.KRAFT, Type.CO_KRAFT},
serverProperties = {
@ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"),
@ClusterConfigProperty(key = REPLICA_SELECTOR_CLASS_CONFIG, value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableReplicaSelector")
}
)
public void testMonitorableServerPlugins(ClusterInstance clusterInstance) {
assertAuthorizerMetrics(clusterInstance);
assertReplicaSelectorMetrics(clusterInstance);
}
private void assertAuthorizerMetrics(ClusterInstance clusterInstance) {
assertMetrics(
clusterInstance.brokers().get(0).metrics(),
4,
expectedTags(AUTHORIZER_CLASS_NAME_CONFIG, "StandardAuthorizer", Map.of("role", "broker")));
assertMetrics(
clusterInstance.controllers().get(controllerId(clusterInstance.type())).metrics(),
4,
expectedTags(AUTHORIZER_CLASS_NAME_CONFIG, "StandardAuthorizer", Map.of("role", "controller")));
}
private void assertReplicaSelectorMetrics(ClusterInstance clusterInstance) {
assertMetrics(
clusterInstance.brokers().get(0).metrics(),
MonitorableReplicaSelector.METRICS_COUNT,
expectedTags(REPLICA_SELECTOR_CLASS_CONFIG, MonitorableReplicaSelector.class.getSimpleName()));
}
private void assertMetrics(Metrics metrics, int expected, Map<String, String> expectedTags) {
int found = 0;
for (MetricName metricName : metrics.metrics().keySet()) {
if (metricName.group().equals("plugins")) {
Map<String, String> tags = metricName.tags();
if (expectedTags.equals(tags)) {
found++;
}
}
}
assertEquals(expected, found);
}
public static class MonitorableReplicaSelector extends RackAwareReplicaSelector implements Monitorable {
private static final int METRICS_COUNT = 1;
@Override
public void withPluginMetrics(PluginMetrics metrics) {
MetricName name = metrics.metricName("name", "description", Map.of());
metrics.addMetric(name, (Measurable) (config, now) -> 123);
}
}
}

View File

@ -357,11 +357,11 @@ public interface ClusterInstance {
default List<Authorizer> authorizers() {
List<Authorizer> authorizers = new ArrayList<>();
authorizers.addAll(brokers().values().stream()
.filter(server -> server.authorizer().isDefined())
.map(server -> server.authorizer().get()).toList());
.filter(server -> server.authorizerPlugin().isDefined())
.map(server -> server.authorizerPlugin().get().get()).toList());
authorizers.addAll(controllers().values().stream()
.filter(server -> server.authorizer().isDefined())
.map(server -> server.authorizer().get()).toList());
.filter(server -> server.authorizerPlugin().isDefined())
.map(server -> server.authorizerPlugin().get().get()).toList());
return authorizers;
}