KAFKA-18926 KafkaPrincipalBuilder should extend KafkaPrincipalSerde (#19987)

In KRaft, custom KafkaPrincipalBuilder instances must implement
KafkaPrincipalSerde to support the forward mechanism. Currently, this
requirement is not enforced and relies on the developer's attention.

With this patch, we can prevent incorrect implementations at compile
time.

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
 <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
S.Y. Wang 2025-06-22 23:01:03 +09:00 committed by GitHub
parent fa4d8836d3
commit 22bef988d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 29 additions and 11 deletions

View File

@ -103,7 +103,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
@Override
public Optional<KafkaPrincipalSerde> principalSerde() {
return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty();
return Optional.of(principalBuilder);
}
@Override

View File

@ -164,7 +164,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
@Override
public Optional<KafkaPrincipalSerde> principalSerde() {
return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty();
return Optional.of(principalBuilder);
}
@Override

View File

@ -23,12 +23,8 @@ package org.apache.kafka.common.security.auth;
* Note that the {@link org.apache.kafka.common.Configurable} and {@link java.io.Closeable}
* interfaces are respected if implemented. Additionally, implementations must provide a
* default no-arg constructor.
*
* Note that custom implementations of {@link KafkaPrincipalBuilder}
* must also implement {@link KafkaPrincipalSerde}, otherwise brokers will not be able to
* forward requests to the controller.
*/
public interface KafkaPrincipalBuilder {
public interface KafkaPrincipalBuilder extends KafkaPrincipalSerde {
/**
* Build a kafka principal from the authentication context.
* @param context The authentication context (either {@link SslAuthenticationContext} or

View File

@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
@ -50,7 +49,7 @@ import javax.security.sasl.SaslServer;
*
* NOTE: This is an internal class and can change without notice.
*/
public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, KafkaPrincipalSerde {
public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder {
private final KerberosShortNamer kerberosShortNamer;
private final SslPrincipalMapper sslPrincipalMapper;

View File

@ -318,7 +318,7 @@ public class SaslServerAuthenticator implements Authenticator {
@Override
public Optional<KafkaPrincipalSerde> principalSerde() {
return principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) principalBuilder) : Optional.empty();
return Optional.of(principalBuilder);
}
@Override

View File

@ -118,5 +118,15 @@ public class ChannelBuildersTest {
public KafkaPrincipal build(AuthenticationContext context) {
return null;
}
@Override
public byte[] serialize(KafkaPrincipal principal) {
return new byte[0];
}
@Override
public KafkaPrincipal deserialize(byte[] bytes) {
return null;
}
}
}

View File

@ -2616,5 +2616,15 @@ public class SaslAuthenticatorTest {
static KafkaPrincipal saslSslPrincipal(String saslPrincipal, String sslPrincipal) {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslPrincipal + ":" + sslPrincipal);
}
@Override
public byte[] serialize(KafkaPrincipal principal) {
return new byte[0];
}
@Override
public KafkaPrincipal deserialize(byte[] bytes) {
return null;
}
}
}

View File

@ -835,7 +835,7 @@ object RequestQuotaTest {
}
}
class TestPrincipalBuilder extends KafkaPrincipalBuilder with KafkaPrincipalSerde {
class TestPrincipalBuilder extends KafkaPrincipalBuilder {
override def build(context: AuthenticationContext): KafkaPrincipal = {
principal
}

View File

@ -31,6 +31,9 @@
<li>
The <code>remote.log.manager.thread.pool.size</code> config was deprecated. Please use the <code>remote.log.manager.follower.thread.pool.size</code> instead.
</li>
<li>The <code>KafkaPrincipalBuilder</code> now extends <code>KafkaPrincipalSerde</code>. Force developer to implement <code>KafkaPrincipalSerde</code> interface for custom <code>KafkaPrincipalBuilder</code>.
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/1gq9F">KIP-1157</a>.
</li>
</ul>