mirror of https://github.com/apache/kafka.git
KAFKA-10727; Handle Kerberos error during re-login as transient failure in clients (#9605)
We use a background thread for Kerberos to perform re-login before tickets expire. The thread performs logout() followed by login(), relying on the Java library to clear and then populate credentials in Subject. This leaves a timing window where clients fail to authenticate because credentials are not available. We cannot introduce any form of locking since authentication is performed on the network thread. So this commit treats NO_CRED as a transient failure rather than a fatal authentication exception in clients. Reviewers: Ron Dagostino <rdagostino@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
parent
9fdea8a6b6
commit
ed8659b4a0
|
@ -327,7 +327,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
|
|||
}
|
||||
}
|
||||
|
||||
private Class<? extends Login> defaultLoginClass() {
|
||||
protected Class<? extends Login> defaultLoginClass() {
|
||||
if (jaasContexts.containsKey(SaslConfigs.GSSAPI_MECHANISM))
|
||||
return KerberosLogin.class;
|
||||
if (OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(clientSaslMechanism))
|
||||
|
|
|
@ -539,17 +539,19 @@ public class SaslClientAuthenticator implements Authenticator {
|
|||
" Users must configure FQDN of kafka brokers when authenticating using SASL and" +
|
||||
" `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm`";
|
||||
}
|
||||
error += " Kafka Client will go to AUTHENTICATION_FAILED state.";
|
||||
//Unwrap the SaslException inside `PrivilegedActionException`
|
||||
Throwable cause = e.getCause();
|
||||
// Treat transient Kerberos errors as non-fatal SaslExceptions that are processed as I/O exceptions
|
||||
// and all other failures as fatal SaslAuthenticationException.
|
||||
if (kerberosError != null && kerberosError.retriable())
|
||||
if ((kerberosError != null && kerberosError.retriable()) || (kerberosError == null && KerberosError.isRetriableClientGssException(e))) {
|
||||
error += " Kafka Client will retry.";
|
||||
throw new SaslException(error, cause);
|
||||
else
|
||||
} else {
|
||||
error += " Kafka Client will go to AUTHENTICATION_FAILED state.";
|
||||
throw new SaslAuthenticationException(error, cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean flushNetOutBuffer() throws IOException {
|
||||
if (!netOutBuffer.completed()) {
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.kafka.common.security.kerberos;
|
|||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
|
||||
import org.apache.kafka.common.utils.Java;
|
||||
import org.ietf.jgss.GSSException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -109,4 +110,22 @@ public enum KerberosError {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the exception should be handled as a transient failure on clients.
|
||||
* We handle GSSException.NO_CRED as retriable on the client-side since this may
|
||||
* occur during re-login if a clients attempts to authentication after logout, but
|
||||
* before the subsequent login.
|
||||
*/
|
||||
public static boolean isRetriableClientGssException(Exception exception) {
|
||||
Throwable cause = exception.getCause();
|
||||
while (cause != null && !(cause instanceof GSSException)) {
|
||||
cause = cause.getCause();
|
||||
}
|
||||
if (cause != null) {
|
||||
GSSException gssException = (GSSException) cause;
|
||||
return gssException.getMajor() == GSSException.NO_CRED;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -346,7 +346,7 @@ public class KerberosLogin extends AbstractLogin {
|
|||
* Re-login a principal. This method assumes that {@link #login()} has happened already.
|
||||
* @throws javax.security.auth.login.LoginException on a failure
|
||||
*/
|
||||
private void reLogin() throws LoginException {
|
||||
protected void reLogin() throws LoginException {
|
||||
if (!isKrbTicket) {
|
||||
return;
|
||||
}
|
||||
|
@ -363,7 +363,7 @@ public class KerberosLogin extends AbstractLogin {
|
|||
//clear up the kerberos state. But the tokens are not cleared! As per
|
||||
//the Java kerberos login module code, only the kerberos credentials
|
||||
//are cleared
|
||||
loginContext.logout();
|
||||
logout();
|
||||
//login and also update the subject field of this instance to
|
||||
//have the new credentials (pass it to the LoginContext constructor)
|
||||
loginContext = new LoginContext(contextName(), subject, null, configuration());
|
||||
|
@ -372,6 +372,11 @@ public class KerberosLogin extends AbstractLogin {
|
|||
}
|
||||
}
|
||||
|
||||
// Visibility to override for testing
|
||||
protected void logout() throws LoginException {
|
||||
loginContext.logout();
|
||||
}
|
||||
|
||||
private long currentElapsedTime() {
|
||||
return time.hiResClockMs();
|
||||
}
|
||||
|
|
|
@ -20,8 +20,8 @@ package kafka.server
|
|||
|
||||
import java.net.InetSocketAddress
|
||||
import java.time.Duration
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.{Executors, TimeUnit}
|
||||
import java.util.{Collections, Properties}
|
||||
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
|
||||
|
||||
import kafka.api.{Both, IntegrationTestHarness, SaslSetup}
|
||||
import kafka.utils.TestUtils
|
||||
|
@ -31,7 +31,8 @@ import org.apache.kafka.common.config.SaslConfigs
|
|||
import org.apache.kafka.common.errors.SaslAuthenticationException
|
||||
import org.apache.kafka.common.network._
|
||||
import org.apache.kafka.common.security.{JaasContext, TestSecurityConfig}
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||
import org.apache.kafka.common.security.auth.{Login, SecurityProtocol}
|
||||
import org.apache.kafka.common.security.kerberos.KerberosLogin
|
||||
import org.apache.kafka.common.utils.{LogContext, MockTime}
|
||||
import org.junit.Assert._
|
||||
import org.junit.{After, Before, Test}
|
||||
|
@ -57,6 +58,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
|
|||
|
||||
@Before
|
||||
override def setUp(): Unit = {
|
||||
TestableKerberosLogin.reset()
|
||||
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
|
||||
serverConfig.put(KafkaConfig.SslClientAuthProp, "required")
|
||||
serverConfig.put(KafkaConfig.FailedAuthenticationDelayMsProp, failedAuthenticationDelayMs.toString)
|
||||
|
@ -78,6 +80,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
|
|||
executor.shutdownNow()
|
||||
super.tearDown()
|
||||
closeSasl()
|
||||
TestableKerberosLogin.reset()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -96,6 +99,35 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
|
|||
assertTrue("Too few authentications: " + successfulAuths, successfulAuths > successfulAuthsPerThread * numThreads)
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that there are no authentication failures during Kerberos re-login. If authentication
|
||||
* is performed when credentials are unavailable between logout and login, we handle it as a
|
||||
* transient error and not an authentication failure so that clients may retry.
|
||||
*/
|
||||
@Test
|
||||
def testReLogin(): Unit = {
|
||||
val selector = createSelectorWithRelogin()
|
||||
try {
|
||||
val login = TestableKerberosLogin.instance
|
||||
assertNotNull(login)
|
||||
executor.submit(() => login.reLogin(), 0)
|
||||
|
||||
val node1 = "1"
|
||||
selector.connect(node1, serverAddr, 1024, 1024)
|
||||
login.logoutResumeLatch.countDown()
|
||||
login.logoutCompleteLatch.await(15, TimeUnit.SECONDS)
|
||||
assertFalse("Authenticated during re-login", pollUntilReadyOrDisconnected(selector, node1))
|
||||
|
||||
login.reLoginResumeLatch.countDown()
|
||||
login.reLoginCompleteLatch.await(15, TimeUnit.SECONDS)
|
||||
val node2 = "2"
|
||||
selector.connect(node2, serverAddr, 1024, 1024)
|
||||
assertTrue("Authenticated failed after re-login", pollUntilReadyOrDisconnected(selector, node2))
|
||||
} finally {
|
||||
selector.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that Kerberos error `Server not found in Kerberos database (7)` is handled
|
||||
* as a fatal authentication failure.
|
||||
|
@ -149,16 +181,8 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
|
|||
while (actualSuccessfulAuths < numSuccessfulAuths) {
|
||||
val nodeId = actualSuccessfulAuths.toString
|
||||
selector.connect(nodeId, serverAddr, 1024, 1024)
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
selector.poll(100)
|
||||
val disconnectState = selector.disconnected().get(nodeId)
|
||||
// Verify that disconnect state is not AUTHENTICATION_FAILED
|
||||
if (disconnectState != null)
|
||||
assertEquals(s"Authentication failed with exception ${disconnectState.exception()}",
|
||||
ChannelState.State.AUTHENTICATE, disconnectState.state())
|
||||
selector.isChannelReady(nodeId) || disconnectState != null
|
||||
}, "Client not ready or disconnected within timeout")
|
||||
if (selector.isChannelReady(nodeId))
|
||||
val isReady = pollUntilReadyOrDisconnected(selector, nodeId)
|
||||
if (isReady)
|
||||
actualSuccessfulAuths += 1
|
||||
selector.close(nodeId)
|
||||
}
|
||||
|
@ -167,6 +191,22 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
|
|||
}
|
||||
}
|
||||
|
||||
private def pollUntilReadyOrDisconnected(selector: Selector, nodeId: String): Boolean = {
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
selector.poll(100)
|
||||
val disconnectState = selector.disconnected().get(nodeId)
|
||||
// Verify that disconnect state is not AUTHENTICATION_FAILED
|
||||
if (disconnectState != null) {
|
||||
assertEquals(s"Authentication failed with exception ${disconnectState.exception()}",
|
||||
ChannelState.State.AUTHENTICATE, disconnectState.state())
|
||||
}
|
||||
selector.isChannelReady(nodeId) || disconnectState != null
|
||||
}, "Client not ready or disconnected within timeout")
|
||||
val isReady = selector.isChannelReady(nodeId)
|
||||
selector.close(nodeId)
|
||||
isReady
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that authentication with the current `clientConfig` results in disconnection and that
|
||||
* the disconnection is notified with disconnect state `AUTHENTICATION_FAILED`. This is to ensure
|
||||
|
@ -192,4 +232,45 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
|
|||
time, true, new LogContext())
|
||||
NetworkTestUtils.createSelector(channelBuilder, time)
|
||||
}
|
||||
|
||||
private def createSelectorWithRelogin(): Selector = {
|
||||
clientConfig.setProperty(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, "0")
|
||||
val config = new TestSecurityConfig(clientConfig)
|
||||
val jaasContexts = Collections.singletonMap("GSSAPI", JaasContext.loadClientContext(config.values()))
|
||||
val channelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol,
|
||||
null, false, kafkaClientSaslMechanism, true, null, null, time, new LogContext()) {
|
||||
override protected def defaultLoginClass(): Class[_ <: Login] = classOf[TestableKerberosLogin]
|
||||
}
|
||||
channelBuilder.configure(config.values())
|
||||
NetworkTestUtils.createSelector(channelBuilder, time)
|
||||
}
|
||||
}
|
||||
|
||||
object TestableKerberosLogin {
|
||||
@volatile var instance: TestableKerberosLogin = _
|
||||
def reset(): Unit = {
|
||||
instance = null
|
||||
}
|
||||
}
|
||||
|
||||
class TestableKerberosLogin extends KerberosLogin {
|
||||
val logoutResumeLatch = new CountDownLatch(1)
|
||||
val logoutCompleteLatch = new CountDownLatch(1)
|
||||
val reLoginResumeLatch = new CountDownLatch(1)
|
||||
val reLoginCompleteLatch = new CountDownLatch(1)
|
||||
|
||||
assertNull(TestableKerberosLogin.instance)
|
||||
TestableKerberosLogin.instance = this
|
||||
|
||||
override def reLogin(): Unit = {
|
||||
super.reLogin()
|
||||
reLoginCompleteLatch.countDown()
|
||||
}
|
||||
|
||||
override protected def logout(): Unit = {
|
||||
logoutResumeLatch.await(15, TimeUnit.SECONDS)
|
||||
super.logout()
|
||||
logoutCompleteLatch.countDown()
|
||||
reLoginResumeLatch.await(15, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue