KAFKA-13051; Require principal builders implement `KafkaPrincipalSerde` and set default (#11011)

This patch adds a check to ensure that principal builder implementations implement `KafkaPrincipalSerde` as specified in KIP-590: https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller. This patch also changes the default value of `principal.builder.class` to `DefaultKafkaPrincipalBuilder`, which was already the implicit behavior when no principal builder was specified.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Ryan Dielhenn 2021-07-13 10:54:36 -07:00 committed by GitHub
parent a08e0cfe65
commit f97f36b650
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 37 additions and 19 deletions

View File

@ -39,7 +39,9 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType}
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
@ -247,6 +249,7 @@ object Defaults {
/** ********* General Security configuration ***********/
val ConnectionsMaxReauthMsDefault = 0L
val DefaultPrincipalSerde = classOf[DefaultKafkaPrincipalBuilder]
/** ********* Sasl configuration ***********/
val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM
@ -1235,7 +1238,7 @@ object KafkaConfig {
.define(securityProviderClassProp, STRING, null, LOW, securityProviderClassDoc)
/** ********* SSL Configuration ****************/
.define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, PrincipalBuilderClassDoc)
.define(PrincipalBuilderClassProp, CLASS, Defaults.DefaultPrincipalSerde, MEDIUM, PrincipalBuilderClassDoc)
.define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc)
.define(SslProviderProp, STRING, null, MEDIUM, SslProviderDoc)
.define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, MEDIUM, SslEnabledProtocolsDoc)
@ -1971,5 +1974,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" +
s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
s" authentication responses from timing out")
val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp)
require(principalBuilderClass != null, s"${KafkaConfig.PrincipalBuilderClassProp} must be non-null")
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
s"${KafkaConfig.PrincipalBuilderClassProp} must implement KafkaPrincipalSerde")
}
}

View File

@ -54,7 +54,8 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid, requests}
import org.apache.kafka.test.{TestUtils => JTestUtils}
@ -76,7 +77,7 @@ object AuthorizerIntegrationTest {
val BrokerListenerName = "BROKER"
val ClientListenerName = "CLIENT"
class PrincipalBuilder extends KafkaPrincipalBuilder {
class PrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
context.listenerName match {
case BrokerListenerName => BrokerPrincipal

View File

@ -27,7 +27,8 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.errors.TopicAuthorizationException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
@ -40,7 +41,7 @@ object GroupAuthorizerIntegrationTest {
val BrokerListenerName = "BROKER"
val ClientListenerName = "CLIENT"
class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
context.listenerName match {
case BrokerListenerName => BrokerPrincipal

View File

@ -19,12 +19,13 @@ package kafka.api
import kafka.api.GroupEndToEndAuthorizationTest._
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, SaslAuthenticationContext}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
object GroupEndToEndAuthorizationTest {
val GroupPrincipalType = "Group"
val ClientGroup = "testGroup"
class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
class GroupPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
context match {
case ctx: SaslAuthenticationContext =>

View File

@ -19,6 +19,7 @@ package kafka.api
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
import org.apache.kafka.common.errors.TopicAuthorizationException
@ -30,7 +31,7 @@ object PlaintextEndToEndAuthorizationTest {
private var clientListenerName = None: Option[String]
@volatile
private var serverListenerName = None: Option[String]
class TestClientPrincipalBuilder extends KafkaPrincipalBuilder {
class TestClientPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
clientListenerName = Some(context.listenerName)
context match {
@ -42,7 +43,7 @@ object PlaintextEndToEndAuthorizationTest {
}
}
class TestServerPrincipalBuilder extends KafkaPrincipalBuilder {
class TestServerPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
serverListenerName = Some(context.listenerName)
context match {

View File

@ -31,13 +31,14 @@ import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.Mode
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
object SaslPlainSslEndToEndAuthorizationTest {
class TestPrincipalBuilder extends KafkaPrincipalBuilder {
class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
val saslContext = context.asInstanceOf[SaslAuthenticationContext]

View File

@ -24,11 +24,12 @@ import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.Mode
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.Java
import org.junit.jupiter.api.BeforeEach
object SslEndToEndAuthorizationTest {
class TestPrincipalBuilder extends KafkaPrincipalBuilder {
class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
private val Pattern = "O=A (.*?),CN=(.*?)".r
// Use full DN as client principal to test special characters in principal

View File

@ -28,7 +28,8 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.
import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, DescribeUserScramCredentialsRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -383,13 +384,13 @@ object AlterCredentialsTest {
}
}
class TestPrincipalBuilderReturningAuthorized extends KafkaPrincipalBuilder {
class TestPrincipalBuilderReturningAuthorized extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
AuthorizedPrincipal
}
}
class TestPrincipalBuilderReturningUnauthorized extends KafkaPrincipalBuilder {
class TestPrincipalBuilderReturningUnauthorized extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
UnauthorizedPrincipal
}

View File

@ -40,7 +40,7 @@ import org.apache.kafka.common.requests.DeleteTopicsRequest
import org.apache.kafka.common.requests.DeleteTopicsResponse
import org.apache.kafka.common.security.auth.AuthenticationContext
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
@ -53,7 +53,7 @@ import scala.jdk.CollectionConverters._
object ControllerMutationQuotaTest {
// Principal used for all client connections. This is updated by each test.
var principal = KafkaPrincipal.ANONYMOUS
class TestPrincipalBuilder extends KafkaPrincipalBuilder {
class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
principal
}

View File

@ -24,7 +24,8 @@ import org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData,
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -125,13 +126,13 @@ object DescribeCredentialsTest {
}
}
class TestPrincipalBuilderReturningAuthorized extends KafkaPrincipalBuilder {
class TestPrincipalBuilderReturningAuthorized extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
AuthorizedPrincipal
}
}
class TestPrincipalBuilderReturningUnauthorized extends KafkaPrincipalBuilder {
class TestPrincipalBuilderReturningUnauthorized extends DefaultKafkaPrincipalBuilder(null, null) {
override def build(context: AuthenticationContext): KafkaPrincipal = {
UnauthorizedPrincipal
}

View File

@ -32,6 +32,8 @@
For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.</li>
<li>Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
<li>Custom principal builder implementations specified through <code>principal.builder.class</code> must now implement the
<code>KafkaPrincipalSerde</code> interface to allow for forwarding between brokers. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller">KIP-590</a> for more details about the usage of KafkaPrincipalSerde.</li>
<li>A number of deprecated classes, methods and tools have been removed from the <code>clients</code>, <code>connect</code>, <code>core</code> and <code>tools</code> modules:</li>
<ul>
<li>The Scala <code>Authorizer</code>, <code>SimpleAclAuthorizer</code> and related classes have been removed. Please use the Java <code>Authorizer</code>