From e8a3bc74254a8e4e4aaca41395177fa4a98b480c Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Fri, 26 Oct 2018 18:18:15 -0400 Subject: [PATCH] KAFKA-7352; Allow SASL Connections to Periodically Re-Authenticate (KIP-368) (#5582) KIP-368 implementation to enable periodic re-authentication of SASL clients. Also adds a broker configuration option to terminate client connections that do not re-authenticate within the configured interval. --- checkstyle/suppressions.xml | 2 +- .../org/apache/kafka/clients/ClientUtils.java | 5 +- .../kafka/clients/admin/KafkaAdminClient.java | 2 +- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../internals/BrokerSecurityConfigs.java | 6 + .../kafka/common/network/Authenticator.java | 102 ++++ .../kafka/common/network/ChannelBuilders.java | 17 +- .../kafka/common/network/KafkaChannel.java | 242 +++++++- .../network/PlaintextChannelBuilder.java | 5 +- .../network/ReauthenticationContext.java | 94 ++++ .../common/network/SaslChannelBuilder.java | 53 +- .../apache/kafka/common/network/Selector.java | 59 +- .../common/network/SslChannelBuilder.java | 5 +- .../requests/SaslAuthenticateRequest.java | 6 +- .../requests/SaslAuthenticateResponse.java | 23 +- .../SaslClientAuthenticator.java | 244 ++++++++- .../authenticator/SaslInternalConfigs.java | 41 ++ .../SaslServerAuthenticator.java | 302 ++++++++-- .../internals/OAuthBearerSaslServer.java | 4 +- .../scram/internals/ScramSaslServer.java | 7 +- .../internals/ScramServerCallbackHandler.java | 4 + .../DelegationTokenCredentialCallback.java | 9 + .../common/network/NetworkTestUtils.java | 10 + .../kafka/common/network/NioEchoServer.java | 128 +++-- .../network/SaslChannelBuilderTest.java | 3 +- .../common/network/SslTransportLayerTest.java | 4 +- .../kafka/common/protocol/ApiKeysTest.java | 4 +- .../common/requests/RequestResponseTest.java | 26 +- .../common/security/TestSecurityConfig.java | 2 + .../SaslAuthenticatorFailureDelayTest.java | 2 +- .../authenticator/SaslAuthenticatorTest.java | 517 ++++++++++++++++-- .../SaslServerAuthenticatorTest.java | 3 +- .../internals/OAuthBearerSaslServerTest.java | 11 + .../org/apache/kafka/test/TestCondition.java | 1 + .../java/org/apache/kafka/test/TestUtils.java | 30 +- .../distributed/WorkerGroupMember.java | 2 +- .../main/scala/kafka/admin/AdminClient.scala | 2 +- .../controller/ControllerChannelManager.scala | 1 + .../TransactionMarkerChannelManager.scala | 1 + .../scala/kafka/network/SocketServer.scala | 55 +- .../main/scala/kafka/server/KafkaConfig.scala | 8 + .../main/scala/kafka/server/KafkaServer.scala | 1 + .../server/ReplicaFetcherBlockingSend.scala | 1 + .../kafka/tools/ReplicaVerificationTool.scala | 2 +- .../kafka/api/EndToEndAuthorizationTest.scala | 35 ++ .../api/SaslEndToEndAuthorizationTest.scala | 1 + .../server/GssapiAuthenticationTest.scala | 2 +- .../scala/unit/kafka/KafkaConfigTest.scala | 16 + .../unit/kafka/server/KafkaConfigTest.scala | 1 + docs/ops.html | 45 ++ .../workload/ConnectionStressWorker.java | 11 +- 52 files changed, 1938 insertions(+), 223 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/network/ReauthenticationContext.java create mode 100644 clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslInternalConfigs.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 5046c7617ff..f3ab7ecfa83 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -51,7 +51,7 @@ files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/> + files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index dce5f3fb65c..fe83c5c54e9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,11 +103,11 @@ public final class ClientUtils { * @param config client configs * @return configured ChannelBuilder based on the configs. */ - public static ChannelBuilder createChannelBuilder(AbstractConfig config) { + public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time) { SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM); return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null, - clientSaslMechanism, true); + clientSaslMechanism, time, true); } static List resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 15021bc1db1..33a47868714 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -350,7 +350,7 @@ public class KafkaAdminClient extends AdminClient { reporters.add(new JmxReporter(JMX_PREFIX)); metrics = new Metrics(metricConfig, reporters, time); String metricGrpPrefix = "admin-client"; - channelBuilder = ClientUtils.createChannelBuilder(config); + channelBuilder = ClientUtils.createChannelBuilder(config, time); selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext); networkClient = new NetworkClient( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 4061373c450..714cd94d6b2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -715,7 +715,7 @@ public class KafkaConsumer implements Consumer { this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0); String metricGrpPrefix = "consumer"; ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer"); - ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time); IsolationLevel isolationLevel = IsolationLevel.valueOf( config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT)); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f1ccf223b83..e91677113d2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -438,7 +438,7 @@ public class KafkaProducer implements Producer { Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) { int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null); int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); - ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig); + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time); ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient( diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java index e3a8a774a51..98f6467a484 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java @@ -35,6 +35,7 @@ public class BrokerSecurityConfigs { public static final String SASL_ENABLED_MECHANISMS_CONFIG = "sasl.enabled.mechanisms"; public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class"; public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules"; + public static final String CONNECTIONS_MAX_REAUTH_MS = "connections.max.reauth.ms"; public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " + "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " + @@ -84,4 +85,9 @@ public class BrokerSecurityConfigs { + "listener prefix and SASL mechanism name in lower-case. For example, " + "listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler."; + public static final String CONNECTIONS_MAX_REAUTH_MS_DOC = "When explicitly set to a positive number (the default is 0, not a positive number), " + + "a session lifetime that will not exceed the configured value will be communicated to v2.2.0 or later clients when they authenticate. " + + "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently " + + "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL " + + "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000"; } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java index 33c2e908516..bcb011e830b 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; import java.io.Closeable; import java.io.IOException; +import java.util.Collections; +import java.util.List; /** * Authentication for Channel @@ -54,4 +56,104 @@ public interface Authenticator extends Closeable { * returns true if authentication is complete otherwise returns false; */ boolean complete(); + + /** + * Begins re-authentication. Uses transportLayer to read or write tokens as is + * done for {@link #authenticate()}. For security protocols PLAINTEXT and SSL, + * this is a no-op since re-authentication does not apply/is not supported, + * respectively. For SASL_PLAINTEXT and SASL_SSL, this performs a SASL + * authentication. Any in-flight responses from prior requests can/will be read + * and collected for later processing as required. There must not be partially + * written requests; any request queued for writing (for which zero bytes have + * been written) remains queued until after re-authentication succeeds. + * + * @param reauthenticationContext + * the context in which this re-authentication is occurring. This + * instance is responsible for closing the previous Authenticator + * returned by + * {@link ReauthenticationContext#previousAuthenticator()}. + * @throws AuthenticationException + * if authentication fails due to invalid credentials or other + * security configuration errors + * @throws IOException + * if read/write fails due to an I/O error + */ + default void reauthenticate(ReauthenticationContext reauthenticationContext) throws IOException { + // empty + } + + /** + * Return the session expiration time, if any, otherwise null. The value is in + * nanoseconds as per {@code System.nanoTime()} and is therefore only useful + * when compared to such a value -- it's absolute value is meaningless. This + * value may be non-null only on the server-side. It represents the time after + * which, in the absence of re-authentication, the broker will close the session + * if it receives a request unrelated to authentication. We store nanoseconds + * here to avoid having to invoke the more expensive {@code milliseconds()} call + * on the broker for every request + * + * @return the session expiration time, if any, otherwise null + */ + default Long serverSessionExpirationTimeNanos() { + return null; + } + + /** + * Return the time on or after which a client should re-authenticate this + * session, if any, otherwise null. The value is in nanoseconds as per + * {@code System.nanoTime()} and is therefore only useful when compared to such + * a value -- it's absolute value is meaningless. This value may be non-null + * only on the client-side. It will be a random time between 85% and 95% of the + * full session lifetime to account for latency between client and server and to + * avoid re-authentication storms that could be caused by many sessions + * re-authenticating simultaneously. + * + * @return the time on or after which a client should re-authenticate this + * session, if any, otherwise null + */ + default Long clientSessionReauthenticationTimeNanos() { + return null; + } + + /** + * Return the number of milliseconds that elapsed while re-authenticating this + * session from the perspective of this instance, if applicable, otherwise null. + * The server-side perspective will yield a lower value than the client-side + * perspective of the same re-authentication because the client-side observes an + * additional network round-trip. + * + * @return the number of milliseconds that elapsed while re-authenticating this + * session from the perspective of this instance, if applicable, + * otherwise null + */ + default Long reauthenticationLatencyMs() { + return null; + } + + /** + * Return the (always non-null but possibly empty) client-side + * {@link NetworkReceive} responses that arrived during re-authentication that + * are unrelated to re-authentication, if any. These correspond to requests sent + * prior to the beginning of re-authentication; the requests were made when the + * channel was successfully authenticated, and the responses arrived during the + * re-authentication process. + * + * @return the (always non-null but possibly empty) client-side + * {@link NetworkReceive} responses that arrived during + * re-authentication that are unrelated to re-authentication, if any + */ + default List getAndClearResponsesReceivedDuringReauthentication() { + return Collections.emptyList(); + } + + /** + * Return true if this is a server-side authenticator and the connected client + * has indicated that it supports re-authentication, otherwise false + * + * @return true if this is a server-side authenticator and the connected client + * has indicated that it supports re-authentication, otherwise false + */ + default boolean connectedClientSupportsReauthentication() { + return false; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index b3040f3ef73..4257c957cd6 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.security.authenticator.CredentialCache; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.ssl.SslPrincipalMapper; import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import java.util.Collections; @@ -36,7 +37,6 @@ import java.util.List; import java.util.Map; public class ChannelBuilders { - private ChannelBuilders() { } /** @@ -55,6 +55,7 @@ public class ChannelBuilders { AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, + Time time, boolean saslHandshakeRequestEnable) { if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) { @@ -64,7 +65,7 @@ public class ChannelBuilders { throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`"); } return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism, - saslHandshakeRequestEnable, null, null); + saslHandshakeRequestEnable, null, null, time); } /** @@ -79,9 +80,10 @@ public class ChannelBuilders { SecurityProtocol securityProtocol, AbstractConfig config, CredentialCache credentialCache, - DelegationTokenCache tokenCache) { + DelegationTokenCache tokenCache, + Time time) { return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, - isInterBrokerListener, null, true, credentialCache, tokenCache); + isInterBrokerListener, null, true, credentialCache, tokenCache, time); } private static ChannelBuilder create(SecurityProtocol securityProtocol, @@ -93,7 +95,8 @@ public class ChannelBuilders { String clientSaslMechanism, boolean saslHandshakeRequestEnable, CredentialCache credentialCache, - DelegationTokenCache tokenCache) { + DelegationTokenCache tokenCache, + Time time) { Map configs; if (listenerName == null) configs = config.values(); @@ -111,6 +114,7 @@ public class ChannelBuilders { requireNonNullMode(mode, securityProtocol); Map jaasContexts; if (mode == Mode.SERVER) { + @SuppressWarnings("unchecked") List enabledMechanisms = (List) configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG); jaasContexts = new HashMap<>(enabledMechanisms.size()); for (String mechanism : enabledMechanisms) @@ -129,7 +133,8 @@ public class ChannelBuilders { clientSaslMechanism, saslHandshakeRequestEnable, credentialCache, - tokenCache); + tokenCache, + time); break; case PLAINTEXT: channelBuilder = new PlaintextChannelBuilder(listenerName); diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 6a12ef23310..47b137512ce 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -27,9 +27,45 @@ import java.net.InetAddress; import java.net.Socket; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.List; import java.util.Objects; +import java.util.function.Supplier; +/** + * A Kafka connection either existing on a client (which could be a broker in an + * inter-broker scenario) and representing the channel to a remote broker or the + * reverse (existing on a broker and representing the channel to a remote + * client, which could be a broker in an inter-broker scenario). + *

+ * Each instance has the following: + *

    + *
  • a unique ID identifying it in the {@code KafkaClient} instance via which + * the connection was made on the client-side or in the instance where it was + * accepted on the server-side
  • + *
  • a reference to the underlying {@link TransportLayer} to allow reading and + * writing
  • + *
  • an {@link Authenticator} that performs the authentication (or + * re-authentication, if that feature is enabled and it applies to this + * connection) by reading and writing directly from/to the same + * {@link TransportLayer}.
  • + *
  • a {@link MemoryPool} into which responses are read (typically the JVM + * heap for clients, though smaller pools can be used for brokers and for + * testing out-of-memory scenarios)
  • + *
  • a {@link NetworkReceive} representing the current incomplete/in-progress + * request (from the server-side perspective) or response (from the client-side + * perspective) being read, if applicable; or a non-null value that has had no + * data read into it yet or a null value if there is no in-progress + * request/response (either could be the case)
  • + *
  • a {@link Send} representing the current request (from the client-side + * perspective) or response (from the server-side perspective) that is either + * waiting to be sent or partially sent, if applicable, or null
  • + *
  • a {@link ChannelMuteState} to document if the channel has been muted due + * to memory pressure or other reasons
  • + *
+ */ public class KafkaChannel { + private static final long MIN_REAUTH_INTERVAL_ONE_SECOND_NANOS = 1000 * 1000 * 1000; + /** * Mute States for KafkaChannel: *