From 22bef988d490a5b653b92830c3e56cd1e00eaf65 Mon Sep 17 00:00:00 2001 From: "S.Y. Wang" Date: Sun, 22 Jun 2025 23:01:03 +0900 Subject: [PATCH] KAFKA-18926 KafkaPrincipalBuilder should extend KafkaPrincipalSerde (#19987) In KRaft, custom KafkaPrincipalBuilder instances must implement KafkaPrincipalSerde to support the forward mechanism. Currently, this requirement is not enforced and relies on the developer's attention. With this patch, we can prevent incorrect implementations at compile time. Reviewers: Ken Huang , TengYao Chi , Chia-Ping Tsai --- .../kafka/common/network/PlaintextChannelBuilder.java | 2 +- .../apache/kafka/common/network/SslChannelBuilder.java | 2 +- .../common/security/auth/KafkaPrincipalBuilder.java | 6 +----- .../authenticator/DefaultKafkaPrincipalBuilder.java | 3 +-- .../authenticator/SaslServerAuthenticator.java | 2 +- .../kafka/common/network/ChannelBuildersTest.java | 10 ++++++++++ .../security/authenticator/SaslAuthenticatorTest.java | 10 ++++++++++ .../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +- docs/upgrade.html | 3 +++ 9 files changed, 29 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java index c6181b81c5e..cf4ef470af0 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -103,7 +103,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder { @Override public Optional principalSerde() { - return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty(); + return Optional.of(principalBuilder); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index b45fb07442e..a35a0b8b209 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -164,7 +164,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable @Override public Optional principalSerde() { - return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty(); + return Optional.of(principalBuilder); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java index ec4317268d1..92be58ea2dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalBuilder.java @@ -23,12 +23,8 @@ package org.apache.kafka.common.security.auth; * Note that the {@link org.apache.kafka.common.Configurable} and {@link java.io.Closeable} * interfaces are respected if implemented. Additionally, implementations must provide a * default no-arg constructor. - * - * Note that custom implementations of {@link KafkaPrincipalBuilder} - * must also implement {@link KafkaPrincipalSerde}, otherwise brokers will not be able to - * forward requests to the controller. */ -public interface KafkaPrincipalBuilder { +public interface KafkaPrincipalBuilder extends KafkaPrincipalSerde { /** * Build a kafka principal from the authentication context. * @param context The authentication context (either {@link SslAuthenticationContext} or diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java index fa654bcb928..5ba472263dd 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.common.security.auth.AuthenticationContext; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; -import org.apache.kafka.common.security.auth.KafkaPrincipalSerde; import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext; import org.apache.kafka.common.security.auth.SaslAuthenticationContext; import org.apache.kafka.common.security.auth.SslAuthenticationContext; @@ -50,7 +49,7 @@ import javax.security.sasl.SaslServer; * * NOTE: This is an internal class and can change without notice. */ -public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, KafkaPrincipalSerde { +public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder { private final KerberosShortNamer kerberosShortNamer; private final SslPrincipalMapper sslPrincipalMapper; diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index e2ebaa31cd2..a0dbe5b21dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -318,7 +318,7 @@ public class SaslServerAuthenticator implements Authenticator { @Override public Optional principalSerde() { - return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty(); + return Optional.of(principalBuilder); } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java index e84c7c5e7c2..01936f457c6 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java @@ -118,5 +118,15 @@ public class ChannelBuildersTest { public KafkaPrincipal build(AuthenticationContext context) { return null; } + + @Override + public byte[] serialize(KafkaPrincipal principal) { + return new byte[0]; + } + + @Override + public KafkaPrincipal deserialize(byte[] bytes) { + return null; + } } } diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index c21b5d11023..a15414a73d0 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -2616,5 +2616,15 @@ public class SaslAuthenticatorTest { static KafkaPrincipal saslSslPrincipal(String saslPrincipal, String sslPrincipal) { return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslPrincipal + ":" + sslPrincipal); } + + @Override + public byte[] serialize(KafkaPrincipal principal) { + return new byte[0]; + } + + @Override + public KafkaPrincipal deserialize(byte[] bytes) { + return null; + } } } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index db03c891e4c..0049204964d 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -835,7 +835,7 @@ object RequestQuotaTest { } } - class TestPrincipalBuilder extends KafkaPrincipalBuilder with KafkaPrincipalSerde { + class TestPrincipalBuilder extends KafkaPrincipalBuilder { override def build(context: AuthenticationContext): KafkaPrincipal = { principal } diff --git a/docs/upgrade.html b/docs/upgrade.html index 9cd6d9b866a..3beec9dcb14 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -31,6 +31,9 @@
  • The remote.log.manager.thread.pool.size config was deprecated. Please use the remote.log.manager.follower.thread.pool.size instead.
  • +
  • The KafkaPrincipalBuilder now extends KafkaPrincipalSerde. Force developer to implement KafkaPrincipalSerde interface for custom KafkaPrincipalBuilder. + For further details, please refer to KIP-1157. +