KAFKA-15853: Move AuthorizerUtils and its dependencies to server module (#15167)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Omnia Ibrahim 2024-01-31 14:38:14 +00:00 committed by GitHub
parent cfc8257479
commit 127fe7d276
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 148 additions and 76 deletions

View File

@ -18,7 +18,7 @@
limitations under the License.
-->
<import-control pkg="org.apache.kafka.server">
<import-control pkg="org.apache.kafka">
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
@ -77,9 +77,17 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.raft" />
<subpackage name="metrics">
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
<allow pkg="org.apache.kafka.server.telemetry" />
<subpackage name="server">
<subpackage name="metrics">
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
<allow pkg="org.apache.kafka.server.telemetry" />
</subpackage>
</subpackage>
<subpackage name="security">
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.server.authorizer" />
</subpackage>
</import-control>

View File

@ -20,7 +20,7 @@ package kafka.admin
import java.util.Properties
import joptsimple._
import joptsimple.util.EnumConverter
import kafka.security.authorizer.{AclAuthorizer, AclEntry, AuthorizerUtils}
import kafka.security.authorizer.{AclAuthorizer, AclEntry}
import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
@ -31,6 +31,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceP
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}

View File

@ -17,7 +17,6 @@
package kafka.network
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent._
import com.fasterxml.jackson.databind.JsonNode
@ -34,8 +33,8 @@ import org.apache.kafka.common.message.EnvelopeResponseData
import org.apache.kafka.common.network.{ClientInformation, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.Session
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import java.util
@ -57,10 +56,6 @@ object RequestChannel extends Logging {
case object ShutdownRequest extends BaseRequest
case object WakeupRequest extends BaseRequest
case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
}
class Metrics(enabledApis: Iterable[ApiKeys]) {
def this(scope: ListenerType) = {
this(ApiKeys.apisForListener(scope).asScala)
@ -103,7 +98,7 @@ object RequestChannel extends Logging {
@volatile var callbackRequestDequeueTimeNanos: Option[Long] = None
@volatile var callbackRequestCompleteTimeNanos: Option[Long] = None
val session = Session(context.principal, context.clientAddress)
val session = new Session(context.principal, context.clientAddress)
private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)

View File

@ -19,10 +19,10 @@ package kafka.network
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode}
import kafka.network.RequestChannel.Session
import org.apache.kafka.common.message._
import org.apache.kafka.common.network.ClientInformation
import org.apache.kafka.common.requests._
import org.apache.kafka.network.Session
object RequestConvertToJson {
def request(request: AbstractRequest): JsonNode = {

View File

@ -1,47 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.authorizer
import java.net.InetAddress
import kafka.network.RequestChannel.Session
import org.apache.kafka.common.resource.Resource
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}
object AuthorizerUtils {
def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer])
def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME)
def sessionToRequestContext(session: Session): AuthorizableRequestContext = {
new AuthorizableRequestContext {
override def clientId(): String = ""
override def requestType(): Int = -1
override def listenerName(): String = ""
override def clientAddress(): InetAddress = session.clientAddress
override def principal(): KafkaPrincipal = session.principal
override def securityProtocol(): SecurityProtocol = null
override def correlationId(): Int = -1
override def requestVersion(): Int = -1
}
}
}

View File

@ -18,7 +18,6 @@
package kafka.server
import kafka.network.RequestChannel
import kafka.security.authorizer.AuthorizerUtils
import kafka.utils.Logging
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclBinding
@ -30,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.authorizer._
import java.util

View File

@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.network.RequestChannel
import kafka.network.RequestChannel._
import kafka.server.ClientQuotaManager._
import kafka.utils.{Logging, QuotaUtils}
import org.apache.kafka.common.{Cluster, MetricName}
@ -33,6 +32,7 @@ import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.config.{ConfigEntityName, ClientQuotaManagerConfig}
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.network.Session
import scala.jdk.CollectionConverters._

View File

@ -17,7 +17,6 @@
package kafka.server
import kafka.network.RequestChannel
import kafka.network.RequestChannel.Session
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.metrics.Metrics
@ -27,6 +26,7 @@ import org.apache.kafka.common.metrics.stats.Rate
import org.apache.kafka.common.metrics.stats.TokenBucket
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.Session
import org.apache.kafka.server.quota.ClientQuotaCallback
import org.apache.kafka.server.config.ClientQuotaManagerConfig

View File

@ -21,7 +21,6 @@ import java.{lang, util}
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import kafka.cluster.EndPoint
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
import kafka.utils.CoreUtils.parseCsvList
import kafka.utils.{CoreUtils, Logging}
@ -40,6 +39,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer

View File

@ -20,8 +20,7 @@ package kafka.security.token.delegation
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.{Base64, Properties}
import kafka.network.RequestChannel.Session
import kafka.security.authorizer.{AclAuthorizer, AuthorizerUtils}
import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{CreateTokenResult, DelegationTokenManager, DelegationTokenManagerZk, KafkaConfig, QuorumTestHarness}
import kafka.utils.TestUtils
@ -38,6 +37,8 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.common.utils.{MockTime, SecurityUtils, Time}
import org.apache.kafka.network.Session
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.Defaults
import org.junit.jupiter.api.Assertions._
@ -316,7 +317,7 @@ class DelegationTokenManagerTest extends QuorumTestHarness {
assertEquals(1, tokens.size)
//get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and with permissions
hostSession = Session(renewer2, InetAddress.getByName("192.168.1.1"))
hostSession = new Session(renewer2, InetAddress.getByName("192.168.1.1"))
createAcl(new AclBinding(new ResourcePattern(DELEGATION_TOKEN, tokenId2, LITERAL),
new AccessControlEntry(renewer2.toString, WildcardHost, DESCRIBE, ALLOW)))
tokens = getTokens(tokenManager, aclAuthorizer, hostSession, renewer2, List(renewer2, renewer3))

View File

@ -19,9 +19,7 @@ package kafka.server
import java.net.InetAddress
import java.util
import java.util.Collections
import kafka.network.RequestChannel
import kafka.network.RequestChannel.Session
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.{MetricConfig, Metrics}
@ -31,6 +29,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.network.Session
import org.junit.jupiter.api.AfterEach
import org.mockito.Mockito.mock
@ -69,7 +68,7 @@ class BaseClientQuotaManagerTest {
protected def buildSession(user: String): Session = {
val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user)
Session(principal, null)
new Session(principal, null)
}
protected def maybeRecord(quotaManager: ClientQuotaManager, user: String, clientId: String, value: Double): Int = {

View File

@ -17,12 +17,12 @@
package kafka.server
import java.net.InetAddress
import kafka.network.RequestChannel.Session
import kafka.server.QuotaType._
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ConfigEntityName}
import org.apache.kafka.network.Session
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -143,7 +143,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
private def checkQuota(quotaManager: ClientQuotaManager, user: String, clientId: String, expectedBound: Long, value: Int, expectThrottle: Boolean): Unit = {
assertEquals(expectedBound.toDouble, quotaManager.quota(user, clientId).bound, 0.0)
val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), InetAddress.getLocalHost)
val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), InetAddress.getLocalHost)
val expectedMaxValueInQuotaWindow =
if (expectedBound < Long.MaxValue) config.quotaWindowSizeSeconds * (config.numQuotaSamples - 1) * expectedBound.toDouble
else Double.MaxValue
@ -161,7 +161,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val numFullQuotaWindows = 3 // 3 seconds window (vs. 10 seconds default)
val nonDefaultConfig = new ClientQuotaManagerConfig(numFullQuotaWindows + 1)
val clientQuotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, Fetch, time, "")
val userSession = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost)
val userSession = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost)
try {
// no quota set

View File

@ -15,7 +15,6 @@
package kafka.server
import kafka.api.LeaderAndIsr
import kafka.network.RequestChannel.Session
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common._
@ -43,6 +42,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourceType => AdminResou
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.network.Session
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@ -223,7 +223,7 @@ class RequestQuotaTest extends BaseRequestTest {
}
}
def session(user: String): Session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
def session(user: String): Session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null)
private def throttleTimeMetricValue(clientId: String): Double = {
throttleTimeMetricValueForQuotaType(clientId, QuotaType.Request)

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.network;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Sanitizer;
import java.net.InetAddress;
public class Session {
public final KafkaPrincipal principal;
public final InetAddress clientAddress;
public final String sanitizedUser;
public Session(KafkaPrincipal principal, InetAddress clientAddress) {
this.principal = principal;
this.clientAddress = clientAddress;
this.sanitizedUser = Sanitizer.sanitize(principal.getName());
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.security.authorizer;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.network.Session;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import java.net.InetAddress;
public class AuthorizerUtils {
public static Authorizer createAuthorizer(String className) throws ClassNotFoundException {
return Utils.newInstance(className, Authorizer.class);
}
public static boolean isClusterResource(String name) {
return name.equals(Resource.CLUSTER_NAME);
}
public static AuthorizableRequestContext sessionToRequestContext(Session session) {
return new AuthorizableRequestContext() {
@Override
public String clientId() {
return "";
}
@Override
public int requestType() {
return -1;
}
@Override
public String listenerName() {
return "";
}
@Override
public InetAddress clientAddress() {
return session.clientAddress;
}
@Override
public KafkaPrincipal principal() {
return session.principal;
}
@Override
public SecurityProtocol securityProtocol() {
return null;
}
@Override
public int correlationId() {
return -1;
}
@Override
public int requestVersion() {
return -1;
}
};
}
}