From f97f36b650ef75841164ba01d42e02955141655a Mon Sep 17 00:00:00 2001 From: Ryan Dielhenn <35785891+dielhennr@users.noreply.github.com> Date: Tue, 13 Jul 2021 10:54:36 -0700 Subject: [PATCH] KAFKA-13051; Require principal builders implement `KafkaPrincipalSerde` and set default (#11011) This patch adds a check to ensure that principal builder implementations implement `KafkaPrincipalSerde` as specified in KIP-590: https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller. This patch also changes the default value of `principal.builder.class` to `DefaultKafkaPrincipalBuilder`, which was already the implicit behavior when no principal builder was specified. Reviewers: Ismael Juma , Jason Gustafson --- core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +++++++++- .../kafka/api/AuthorizerIntegrationTest.scala | 5 +++-- .../kafka/api/GroupAuthorizerIntegrationTest.scala | 5 +++-- .../kafka/api/GroupEndToEndAuthorizationTest.scala | 5 +++-- .../kafka/api/PlaintextEndToEndAuthorizationTest.scala | 5 +++-- .../api/SaslPlainSslEndToEndAuthorizationTest.scala | 3 ++- .../kafka/api/SslEndToEndAuthorizationTest.scala | 3 ++- .../server/AlterUserScramCredentialsRequestTest.scala | 7 ++++--- .../kafka/server/ControllerMutationQuotaTest.scala | 4 ++-- .../DescribeUserScramCredentialsRequestTest.scala | 7 ++++--- docs/upgrade.html | 2 ++ 11 files changed, 37 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index cada6708ec2..634bb4d0653 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -39,7 +39,9 @@ import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.metrics.Sensor import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType} +import org.apache.kafka.common.security.auth.KafkaPrincipalSerde; import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; import org.apache.kafka.common.utils.Utils import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.authorizer.Authorizer @@ -247,6 +249,7 @@ object Defaults { /** ********* General Security configuration ***********/ val ConnectionsMaxReauthMsDefault = 0L + val DefaultPrincipalSerde = classOf[DefaultKafkaPrincipalBuilder] /** ********* Sasl configuration ***********/ val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM @@ -1235,7 +1238,7 @@ object KafkaConfig { .define(securityProviderClassProp, STRING, null, LOW, securityProviderClassDoc) /** ********* SSL Configuration ****************/ - .define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, PrincipalBuilderClassDoc) + .define(PrincipalBuilderClassProp, CLASS, Defaults.DefaultPrincipalSerde, MEDIUM, PrincipalBuilderClassDoc) .define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc) .define(SslProviderProp, STRING, null, MEDIUM, SslProviderDoc) .define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, MEDIUM, SslEnabledProtocolsDoc) @@ -1971,5 +1974,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" + s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" + s" authentication responses from timing out") + + val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp) + require(principalBuilderClass != null, s"${KafkaConfig.PrincipalBuilderClassProp} must be non-null") + require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass), + s"${KafkaConfig.PrincipalBuilderClassProp} must implement KafkaPrincipalSerde") } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 264f6eb71b3..e5f095df89b 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -54,7 +54,8 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED} import org.apache.kafka.common.resource.ResourceType._ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType} -import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol} +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid, requests} import org.apache.kafka.test.{TestUtils => JTestUtils} @@ -76,7 +77,7 @@ object AuthorizerIntegrationTest { val BrokerListenerName = "BROKER" val ClientListenerName = "CLIENT" - class PrincipalBuilder extends KafkaPrincipalBuilder { + class PrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { context.listenerName match { case BrokerListenerName => BrokerPrincipal diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala index 2b3ae1c1da6..4f767356d8c 100644 --- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala @@ -27,7 +27,8 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.errors.TopicAuthorizationException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} -import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder} +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal} +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, Test} @@ -40,7 +41,7 @@ object GroupAuthorizerIntegrationTest { val BrokerListenerName = "BROKER" val ClientListenerName = "CLIENT" - class GroupPrincipalBuilder extends KafkaPrincipalBuilder { + class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { context.listenerName match { case BrokerListenerName => BrokerPrincipal diff --git a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala index 05243f0a7ab..8bd639389cd 100644 --- a/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupEndToEndAuthorizationTest.scala @@ -19,12 +19,13 @@ package kafka.api import kafka.api.GroupEndToEndAuthorizationTest._ import kafka.utils.JaasTestUtils import org.apache.kafka.common.config.internals.BrokerSecurityConfigs -import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext} +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, SaslAuthenticationContext} +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder object GroupEndToEndAuthorizationTest { val GroupPrincipalType = "Group" val ClientGroup = "testGroup" - class GroupPrincipalBuilder extends KafkaPrincipalBuilder { + class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { context match { case ctx: SaslAuthenticationContext => diff --git a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala index 842c4ad1f9f..c44028c8b7b 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala @@ -19,6 +19,7 @@ package kafka.api import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth._ +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.junit.jupiter.api.{BeforeEach, Test} import org.junit.jupiter.api.Assertions._ import org.apache.kafka.common.errors.TopicAuthorizationException @@ -30,7 +31,7 @@ object PlaintextEndToEndAuthorizationTest { private var clientListenerName = None: Option[String] @volatile private var serverListenerName = None: Option[String] - class TestClientPrincipalBuilder extends KafkaPrincipalBuilder { + class TestClientPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { clientListenerName = Some(context.listenerName) context match { @@ -42,7 +43,7 @@ object PlaintextEndToEndAuthorizationTest { } } - class TestServerPrincipalBuilder extends KafkaPrincipalBuilder { + class TestServerPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { serverListenerName = Some(context.listenerName) context match { diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala index 2e5411d2158..772780381ee 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala @@ -31,13 +31,14 @@ import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.network.Mode import org.apache.kafka.common.security.auth._ +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.common.security.plain.PlainAuthenticateCallback import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test object SaslPlainSslEndToEndAuthorizationTest { - class TestPrincipalBuilder extends KafkaPrincipalBuilder { + class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { val saslContext = context.asInstanceOf[SaslAuthenticationContext] diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala index a47ac0b8898..65bef416569 100644 --- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -24,11 +24,12 @@ import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.network.Mode import org.apache.kafka.common.security.auth._ +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.common.utils.Java import org.junit.jupiter.api.BeforeEach object SslEndToEndAuthorizationTest { - class TestPrincipalBuilder extends KafkaPrincipalBuilder { + class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { private val Pattern = "O=A (.*?),CN=(.*?)".r // Use full DN as client principal to test special characters in principal diff --git a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala index 99d40bac6af..e0121b17c5d 100644 --- a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala @@ -28,7 +28,8 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData. import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, DescribeUserScramCredentialsRequestData} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse} -import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder} +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal} +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -383,13 +384,13 @@ object AlterCredentialsTest { } } - class TestPrincipalBuilderReturningAuthorized extends KafkaPrincipalBuilder { + class TestPrincipalBuilderReturningAuthorized extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { AuthorizedPrincipal } } - class TestPrincipalBuilderReturningUnauthorized extends KafkaPrincipalBuilder { + class TestPrincipalBuilderReturningUnauthorized extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { UnauthorizedPrincipal } diff --git a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala index 6465028b8d5..20e89ed5564 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala @@ -40,7 +40,7 @@ import org.apache.kafka.common.requests.DeleteTopicsRequest import org.apache.kafka.common.requests.DeleteTopicsResponse import org.apache.kafka.common.security.auth.AuthenticationContext import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertTrue @@ -53,7 +53,7 @@ import scala.jdk.CollectionConverters._ object ControllerMutationQuotaTest { // Principal used for all client connections. This is updated by each test. var principal = KafkaPrincipal.ANONYMOUS - class TestPrincipalBuilder extends KafkaPrincipalBuilder { + class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { principal } diff --git a/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala index e4f28e2ff86..012f83303e7 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala @@ -24,7 +24,8 @@ import org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse} -import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder} +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal} +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -125,13 +126,13 @@ object DescribeCredentialsTest { } } - class TestPrincipalBuilderReturningAuthorized extends KafkaPrincipalBuilder { + class TestPrincipalBuilderReturningAuthorized extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { AuthorizedPrincipal } } - class TestPrincipalBuilderReturningUnauthorized extends KafkaPrincipalBuilder { + class TestPrincipalBuilderReturningUnauthorized extends DefaultKafkaPrincipalBuilder(null, null) { override def build(context: AuthenticationContext): KafkaPrincipal = { UnauthorizedPrincipal } diff --git a/docs/upgrade.html b/docs/upgrade.html index 5a9555e9e48..c0d8eab3bc4 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -32,6 +32,8 @@ For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.
  • Kafka Streams no longer has a compile time dependency on "connect:json" module (KAFKA-5146). Projects that were relying on this transitive dependency will have to explicitly declare it.
  • +
  • Custom principal builder implementations specified through principal.builder.class must now implement the + KafkaPrincipalSerde interface to allow for forwarding between brokers. See KIP-590 for more details about the usage of KafkaPrincipalSerde.
  • A number of deprecated classes, methods and tools have been removed from the clients, connect, core and tools modules: