diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml index c395a5f2214..0fad8c16273 100644 --- a/checkstyle/import-control-server.xml +++ b/checkstyle/import-control-server.xml @@ -18,7 +18,7 @@ limitations under the License. --> - + @@ -77,9 +77,17 @@ - - - + + + + + + + + + + + diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index f835475fa9e..2e867d76dd2 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -20,7 +20,7 @@ package kafka.admin import java.util.Properties import joptsimple._ import joptsimple.util.EnumConverter -import kafka.security.authorizer.{AclAuthorizer, AclEntry, AuthorizerUtils} +import kafka.security.authorizer.{AclAuthorizer, AclEntry} import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} @@ -31,6 +31,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceP import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils} +import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 6c08798031f..007049814cb 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -17,7 +17,6 @@ package kafka.network -import java.net.InetAddress import java.nio.ByteBuffer import java.util.concurrent._ import com.fasterxml.jackson.databind.JsonNode @@ -34,8 +33,8 @@ import org.apache.kafka.common.message.EnvelopeResponseData import org.apache.kafka.common.network.{ClientInformation, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests._ -import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.utils.{Sanitizer, Time} +import org.apache.kafka.common.utils.Time +import org.apache.kafka.network.Session import org.apache.kafka.server.metrics.KafkaMetricsGroup import java.util @@ -57,10 +56,6 @@ object RequestChannel extends Logging { case object ShutdownRequest extends BaseRequest case object WakeupRequest extends BaseRequest - case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) { - val sanitizedUser: String = Sanitizer.sanitize(principal.getName) - } - class Metrics(enabledApis: Iterable[ApiKeys]) { def this(scope: ListenerType) = { this(ApiKeys.apisForListener(scope).asScala) @@ -103,7 +98,7 @@ object RequestChannel extends Logging { @volatile var callbackRequestDequeueTimeNanos: Option[Long] = None @volatile var callbackRequestCompleteTimeNanos: Option[Long] = None - val session = Session(context.principal, context.clientAddress) + val session = new Session(context.principal, context.clientAddress) private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala index bf120376342..cd42b17809b 100644 --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala @@ -19,10 +19,10 @@ package kafka.network import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode} -import kafka.network.RequestChannel.Session import org.apache.kafka.common.message._ import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.requests._ +import org.apache.kafka.network.Session object RequestConvertToJson { def request(request: AbstractRequest): JsonNode = { diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala deleted file mode 100644 index 0e417d677eb..00000000000 --- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.security.authorizer - -import java.net.InetAddress - -import kafka.network.RequestChannel.Session -import org.apache.kafka.common.resource.Resource -import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer} - - -object AuthorizerUtils { - - def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer]) - - def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME) - - def sessionToRequestContext(session: Session): AuthorizableRequestContext = { - new AuthorizableRequestContext { - override def clientId(): String = "" - override def requestType(): Int = -1 - override def listenerName(): String = "" - override def clientAddress(): InetAddress = session.clientAddress - override def principal(): KafkaPrincipal = session.principal - override def securityProtocol(): SecurityProtocol = null - override def correlationId(): Int = -1 - override def requestVersion(): Int = -1 - } - } -} diff --git a/core/src/main/scala/kafka/server/AclApis.scala b/core/src/main/scala/kafka/server/AclApis.scala index 485cafeca20..fd58bc8ba83 100644 --- a/core/src/main/scala/kafka/server/AclApis.scala +++ b/core/src/main/scala/kafka/server/AclApis.scala @@ -18,7 +18,6 @@ package kafka.server import kafka.network.RequestChannel -import kafka.security.authorizer.AuthorizerUtils import kafka.utils.Logging import org.apache.kafka.common.acl.AclOperation._ import org.apache.kafka.common.acl.AclBinding @@ -30,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType +import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.server.authorizer._ import java.util diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 5169509ea21..a10832d9690 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit} import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.network.RequestChannel -import kafka.network.RequestChannel._ import kafka.server.ClientQuotaManager._ import kafka.utils.{Logging, QuotaUtils} import org.apache.kafka.common.{Cluster, MetricName} @@ -33,6 +32,7 @@ import org.apache.kafka.common.utils.{Sanitizer, Time} import org.apache.kafka.server.config.{ConfigEntityName, ClientQuotaManagerConfig} import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType} import org.apache.kafka.server.util.ShutdownableThread +import org.apache.kafka.network.Session import scala.jdk.CollectionConverters._ diff --git a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala index b12d5aa4595..e1d38eb65ee 100644 --- a/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala @@ -17,7 +17,6 @@ package kafka.server import kafka.network.RequestChannel -import kafka.network.RequestChannel.Session import org.apache.kafka.common.MetricName import org.apache.kafka.common.errors.ThrottlingQuotaExceededException import org.apache.kafka.common.metrics.Metrics @@ -27,6 +26,7 @@ import org.apache.kafka.common.metrics.stats.Rate import org.apache.kafka.common.metrics.stats.TokenBucket import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.utils.Time +import org.apache.kafka.network.Session import org.apache.kafka.server.quota.ClientQuotaCallback import org.apache.kafka.server.config.ClientQuotaManagerConfig diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a9cda78dd0f..f7967ed8457 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -21,7 +21,6 @@ import java.{lang, util} import java.util.concurrent.TimeUnit import java.util.{Collections, Properties} import kafka.cluster.EndPoint -import kafka.security.authorizer.AuthorizerUtils import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp} import kafka.utils.CoreUtils.parseCsvList import kafka.utils.{CoreUtils, Logging} @@ -40,6 +39,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.assignor.PartitionAssignor import org.apache.kafka.raft.RaftConfig +import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.security.PasswordEncoderConfigs import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala index f829ce3570b..47f6783f2b9 100644 --- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala +++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala @@ -20,8 +20,7 @@ package kafka.security.token.delegation import java.net.InetAddress import java.nio.ByteBuffer import java.util.{Base64, Properties} -import kafka.network.RequestChannel.Session -import kafka.security.authorizer.{AclAuthorizer, AuthorizerUtils} +import kafka.security.authorizer.AclAuthorizer import kafka.security.authorizer.AclEntry.WildcardHost import kafka.server.{CreateTokenResult, DelegationTokenManager, DelegationTokenManagerZk, KafkaConfig, QuorumTestHarness} import kafka.utils.TestUtils @@ -38,6 +37,8 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{MockTime, SecurityUtils, Time} +import org.apache.kafka.network.Session +import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.config.Defaults import org.junit.jupiter.api.Assertions._ @@ -316,7 +317,7 @@ class DelegationTokenManagerTest extends QuorumTestHarness { assertEquals(1, tokens.size) //get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and with permissions - hostSession = Session(renewer2, InetAddress.getByName("192.168.1.1")) + hostSession = new Session(renewer2, InetAddress.getByName("192.168.1.1")) createAcl(new AclBinding(new ResourcePattern(DELEGATION_TOKEN, tokenId2, LITERAL), new AccessControlEntry(renewer2.toString, WildcardHost, DESCRIBE, ALLOW))) tokens = getTokens(tokenManager, aclAuthorizer, hostSession, renewer2, List(renewer2, renewer3)) diff --git a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala index edda22ff17c..af4e08f5586 100644 --- a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala @@ -19,9 +19,7 @@ package kafka.server import java.net.InetAddress import java.util import java.util.Collections - import kafka.network.RequestChannel -import kafka.network.RequestChannel.Session import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.metrics.{MetricConfig, Metrics} @@ -31,6 +29,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.utils.MockTime +import org.apache.kafka.network.Session import org.junit.jupiter.api.AfterEach import org.mockito.Mockito.mock @@ -69,7 +68,7 @@ class BaseClientQuotaManagerTest { protected def buildSession(user: String): Session = { val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user) - Session(principal, null) + new Session(principal, null) } protected def maybeRecord(quotaManager: ClientQuotaManager, user: String, clientId: String, value: Double): Int = { diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 1dd1823b0f1..e67203c565f 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -17,12 +17,12 @@ package kafka.server import java.net.InetAddress -import kafka.network.RequestChannel.Session import kafka.server.QuotaType._ import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ConfigEntityName} +import org.apache.kafka.network.Session import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -143,7 +143,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { private def checkQuota(quotaManager: ClientQuotaManager, user: String, clientId: String, expectedBound: Long, value: Int, expectThrottle: Boolean): Unit = { assertEquals(expectedBound.toDouble, quotaManager.quota(user, clientId).bound, 0.0) - val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), InetAddress.getLocalHost) + val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), InetAddress.getLocalHost) val expectedMaxValueInQuotaWindow = if (expectedBound < Long.MaxValue) config.quotaWindowSizeSeconds * (config.numQuotaSamples - 1) * expectedBound.toDouble else Double.MaxValue @@ -161,7 +161,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { val numFullQuotaWindows = 3 // 3 seconds window (vs. 10 seconds default) val nonDefaultConfig = new ClientQuotaManagerConfig(numFullQuotaWindows + 1) val clientQuotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, Fetch, time, "") - val userSession = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost) + val userSession = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost) try { // no quota set diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index d765dda5a0a..5ced592137c 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -15,7 +15,6 @@ package kafka.server import kafka.api.LeaderAndIsr -import kafka.network.RequestChannel.Session import kafka.security.authorizer.AclAuthorizer import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.common._ @@ -43,6 +42,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourceType => AdminResou import org.apache.kafka.common.security.auth._ import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils} import org.apache.kafka.metadata.authorizer.StandardAuthorizer +import org.apache.kafka.network.Session import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} @@ -223,7 +223,7 @@ class RequestQuotaTest extends BaseRequestTest { } } - def session(user: String): Session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null) + def session(user: String): Session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), null) private def throttleTimeMetricValue(clientId: String): Double = { throttleTimeMetricValueForQuotaType(clientId, QuotaType.Request) diff --git a/server/src/main/java/org/apache/kafka/network/Session.java b/server/src/main/java/org/apache/kafka/network/Session.java new file mode 100644 index 00000000000..1793e4d565c --- /dev/null +++ b/server/src/main/java/org/apache/kafka/network/Session.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; + +import java.net.InetAddress; + +public class Session { + public final KafkaPrincipal principal; + public final InetAddress clientAddress; + public final String sanitizedUser; + + public Session(KafkaPrincipal principal, InetAddress clientAddress) { + this.principal = principal; + this.clientAddress = clientAddress; + this.sanitizedUser = Sanitizer.sanitize(principal.getName()); + } +} diff --git a/server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java b/server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java new file mode 100644 index 00000000000..763f156158a --- /dev/null +++ b/server/src/main/java/org/apache/kafka/security/authorizer/AuthorizerUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.security.authorizer; + +import org.apache.kafka.common.resource.Resource; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; +import org.apache.kafka.server.authorizer.Authorizer; + +import java.net.InetAddress; + +public class AuthorizerUtils { + public static Authorizer createAuthorizer(String className) throws ClassNotFoundException { + return Utils.newInstance(className, Authorizer.class); + } + + public static boolean isClusterResource(String name) { + return name.equals(Resource.CLUSTER_NAME); + } + + public static AuthorizableRequestContext sessionToRequestContext(Session session) { + return new AuthorizableRequestContext() { + @Override + public String clientId() { + return ""; + } + + @Override + public int requestType() { + return -1; + } + + @Override + public String listenerName() { + return ""; + } + + @Override + public InetAddress clientAddress() { + return session.clientAddress; + } + + @Override + public KafkaPrincipal principal() { + return session.principal; + } + + @Override + public SecurityProtocol securityProtocol() { + return null; + } + + @Override + public int correlationId() { + return -1; + } + + @Override + public int requestVersion() { + return -1; + } + }; + } +}