diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java index c6181b81c5e..cf4ef470af0 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -103,7 +103,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder { @Override public Optional principalSerde() { - return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty(); + return Optional.of(principalBuilder); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index b45fb07442e..a35a0b8b209 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -164,7 +164,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable @Override public Optional principalSerde() { - return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty(); + return Optional.of(principalBuilder); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java index ec4317268d1..92be58ea2dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java @@ -23,12 +23,8 @@ package org.apache.kafka.common.security.auth; * Note that the {@link org.apache.kafka.common.Configurable} and {@link java.io.Closeable} * interfaces are respected if implemented. Additionally, implementations must provide a * default no-arg constructor. - * - * Note that custom implementations of {@link KafkaPrincipalBuilder} - * must also implement {@link KafkaPrincipalSerde}, otherwise brokers will not be able to - * forward requests to the controller. */ -public interface KafkaPrincipalBuilder { +public interface KafkaPrincipalBuilder extends KafkaPrincipalSerde { /** * Build a kafka principal from the authentication context. * @param context The authentication context (either {@link SslAuthenticationContext} or diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java index fa654bcb928..5ba472263dd 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.security.auth.AuthenticationContext; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; -import org.apache.kafka.common.security.auth.KafkaPrincipalSerde; import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext; import org.apache.kafka.common.security.auth.SaslAuthenticationContext; import org.apache.kafka.common.security.auth.SslAuthenticationContext; @@ -50,7 +49,7 @@ import javax.security.sasl.SaslServer; * * NOTE: This is an internal class and can change without notice. */ -public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, KafkaPrincipalSerde { +public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder { private final KerberosShortNamer kerberosShortNamer; private final SslPrincipalMapper sslPrincipalMapper; diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index e2ebaa31cd2..a0dbe5b21dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -318,7 +318,7 @@ public class SaslServerAuthenticator implements Authenticator { @Override public Optional principalSerde() { - return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty(); + return Optional.of(principalBuilder); } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java index e84c7c5e7c2..01936f457c6 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java @@ -118,5 +118,15 @@ public class ChannelBuildersTest { public KafkaPrincipal build(AuthenticationContext context) { return null; } + + @Override + public byte[] serialize(KafkaPrincipal principal) { + return new byte[0]; + } + + @Override + public KafkaPrincipal deserialize(byte[] bytes) { + return null; + } } } diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index c21b5d11023..a15414a73d0 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -2616,5 +2616,15 @@ public class SaslAuthenticatorTest { static KafkaPrincipal saslSslPrincipal(String saslPrincipal, String sslPrincipal) { return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslPrincipal + ":" + sslPrincipal); } + + @Override + public byte[] serialize(KafkaPrincipal principal) { + return new byte[0]; + } + + @Override + public KafkaPrincipal deserialize(byte[] bytes) { + return null; + } } } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index db03c891e4c..0049204964d 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -835,7 +835,7 @@ object RequestQuotaTest { } } - class TestPrincipalBuilder extends KafkaPrincipalBuilder with KafkaPrincipalSerde { + class TestPrincipalBuilder extends KafkaPrincipalBuilder { override def build(context: AuthenticationContext): KafkaPrincipal = { principal } diff --git a/docs/upgrade.html b/docs/upgrade.html index 9cd6d9b866a..3beec9dcb14 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -31,6 +31,9 @@
  • The remote.log.manager.thread.pool.size config was deprecated. Please use the remote.log.manager.follower.thread.pool.size instead.
  • +
  • The KafkaPrincipalBuilder now extends KafkaPrincipalSerde. Force developer to implement KafkaPrincipalSerde interface for custom KafkaPrincipalBuilder. + For further details, please refer to KIP-1157. +