KAFKA-6004; Allow authentication providers to override error message

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #4015 from rajinisivaram/KAFKA-6004-auth-exception
This commit is contained in:
Rajini Sivaram 2017-10-04 13:44:46 -04:00
parent 5afeddaa99
commit 9949e1ed1b
9 changed files with 74 additions and 22 deletions

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
import javax.security.sasl.SaslServer;
/** /**
* This exception indicates that SASL authentication has failed. The error message * This exception indicates that SASL authentication has failed. The error message
* in the exception indicates the actual cause of failure. * in the exception indicates the actual cause of failure.
@ -24,6 +26,12 @@ package org.apache.kafka.common.errors;
* could also include other failures specific to the SASL mechanism used * could also include other failures specific to the SASL mechanism used
* for authentication. * for authentication.
* </p> * </p>
* <p><b>Note:</b>If {@link SaslServer#evaluateResponse(byte[])} throws this exception during
* authentication, the message from the exception will be sent to clients in the SaslAuthenticate
* response. Custom {@link SaslServer} implementations may throw this exception in order to
* provide custom error messages to clients, but should take care not to include any
* security-critical information in the message that should not be leaked to unauthenticated clients.
* </p>
*/ */
public class SaslAuthenticationException extends AuthenticationException { public class SaslAuthenticationException extends AuthenticationException {

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.Authenticator;
@ -380,9 +381,11 @@ public class SaslServerAuthenticator implements Authenticator {
// For versions with SASL_AUTHENTICATE header, send a response to SASL_AUTHENTICATE request even if token is empty. // For versions with SASL_AUTHENTICATE header, send a response to SASL_AUTHENTICATE request even if token is empty.
ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken); ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken);
sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, responseBuf)); sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, responseBuf));
} catch (SaslException e) { } catch (SaslAuthenticationException | SaslException e) {
String errorMessage = e instanceof SaslAuthenticationException ? e.getMessage() :
"Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism;
sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED,
"Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism)); errorMessage));
throw e; throw e;
} }
} }

View File

@ -26,6 +26,7 @@ import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer; import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslServerFactory; import javax.security.sasl.SaslServerFactory;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler; import org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler;
@ -56,8 +57,18 @@ public class PlainSaslServer implements SaslServer {
this.jaasContext = jaasContext; this.jaasContext = jaasContext;
} }
/**
* @throws SaslAuthenticationException if username/password combination is invalid or if the requested
* authorization id is not the same as username.
* <p>
* <b>Note:</b> This method may throw {@link SaslAuthenticationException} to provide custom error messages
* to clients. But care should be taken to avoid including any information in the exception message that
* should not be leaked to unauthenticated clients. It may be safer to throw {@link SaslException} in
* some cases so that a standard error message is returned to clients.
* </p>
*/
@Override @Override
public byte[] evaluateResponse(byte[] response) throws SaslException { public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthenticationException {
/* /*
* Message format (from https://tools.ietf.org/html/rfc4616): * Message format (from https://tools.ietf.org/html/rfc4616):
* *
@ -93,11 +104,11 @@ public class PlainSaslServer implements SaslServer {
String expectedPassword = jaasContext.configEntryOption(JAAS_USER_PREFIX + username, String expectedPassword = jaasContext.configEntryOption(JAAS_USER_PREFIX + username,
PlainLoginModule.class.getName()); PlainLoginModule.class.getName());
if (!password.equals(expectedPassword)) { if (!password.equals(expectedPassword)) {
throw new SaslException("Authentication failed: Invalid username or password"); throw new SaslAuthenticationException("Authentication failed: Invalid username or password");
} }
if (!authorizationIdFromClient.isEmpty() && !authorizationIdFromClient.equals(username)) if (!authorizationIdFromClient.isEmpty() && !authorizationIdFromClient.equals(username))
throw new SaslException("Authentication failed: Client requested an authorization id that is different from username"); throw new SaslAuthenticationException("Authentication failed: Client requested an authorization id that is different from username");
this.authorizationId = username; this.authorizationId = username;

View File

@ -32,6 +32,7 @@ import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslServerFactory; import javax.security.sasl.SaslServerFactory;
import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage; import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage;
import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage; import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage;
import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage; import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage;
@ -73,8 +74,17 @@ public class ScramSaslServer implements SaslServer {
setState(State.RECEIVE_CLIENT_FIRST_MESSAGE); setState(State.RECEIVE_CLIENT_FIRST_MESSAGE);
} }
/**
* @throws SaslAuthenticationException if the requested authorization id is not the same as username.
* <p>
* <b>Note:</b> This method may throw {@link SaslAuthenticationException} to provide custom error messages
* to clients. But care should be taken to avoid including any information in the exception message that
* should not be leaked to unauthenticated clients. It may be safer to throw {@link SaslException} in
* most cases so that a standard error message is returned to clients.
* </p>
*/
@Override @Override
public byte[] evaluateResponse(byte[] response) throws SaslException { public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthenticationException {
try { try {
switch (state) { switch (state) {
case RECEIVE_CLIENT_FIRST_MESSAGE: case RECEIVE_CLIENT_FIRST_MESSAGE:
@ -91,7 +101,7 @@ public class ScramSaslServer implements SaslServer {
throw new SaslException("Authentication failed: Invalid user credentials"); throw new SaslException("Authentication failed: Invalid user credentials");
String authorizationIdFromClient = clientFirstMessage.authorizationId(); String authorizationIdFromClient = clientFirstMessage.authorizationId();
if (!authorizationIdFromClient.isEmpty() && !authorizationIdFromClient.equals(username)) if (!authorizationIdFromClient.isEmpty() && !authorizationIdFromClient.equals(username))
throw new SaslException("Authentication failed: Client requested an authorization id that is different from username"); throw new SaslAuthenticationException("Authentication failed: Client requested an authorization id that is different from username");
if (scramCredential.iterations() < mechanism.minIterations()) if (scramCredential.iterations() < mechanism.minIterations())
throw new SaslException("Iterations " + scramCredential.iterations() + " is less than the minimum " + mechanism.minIterations() + " for " + mechanism); throw new SaslException("Iterations " + scramCredential.iterations() + " is less than the minimum " + mechanism.minIterations() + " for " + mechanism);

View File

@ -79,7 +79,8 @@ public class NetworkTestUtils {
assertTrue(selector.isChannelReady(node)); assertTrue(selector.isChannelReady(node));
} }
public static void waitForChannelClose(Selector selector, String node, ChannelState.State channelState) throws IOException { public static ChannelState waitForChannelClose(Selector selector, String node, ChannelState.State channelState)
throws IOException {
boolean closed = false; boolean closed = false;
for (int i = 0; i < 30; i++) { for (int i = 0; i < 30; i++) {
selector.poll(1000L); selector.poll(1000L);
@ -89,6 +90,8 @@ public class NetworkTestUtils {
} }
} }
assertTrue("Channel was not closed by timeout", closed); assertTrue("Channel was not closed by timeout", closed);
assertEquals(channelState, selector.disconnected().get(node).state()); ChannelState finalState = selector.disconnected().get(node);
assertEquals(channelState, finalState.state());
return finalState;
} }
} }

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.network.CertStores; import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.network.ChannelBuilders;
@ -154,7 +155,8 @@ public class SaslAuthenticatorTest {
jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, "invalidpassword"); jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, "invalidpassword");
server = createEchoServer(securityProtocol); server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node); createAndCheckClientAuthenticationFailure(securityProtocol, node, "PLAIN",
"Authentication failed: Invalid username or password");
server.verifyAuthenticationMetrics(0, 1); server.verifyAuthenticationMetrics(0, 1);
} }
@ -169,7 +171,8 @@ public class SaslAuthenticatorTest {
jaasConfig.setClientOptions("PLAIN", "invaliduser", TestJaasConfig.PASSWORD); jaasConfig.setClientOptions("PLAIN", "invaliduser", TestJaasConfig.PASSWORD);
server = createEchoServer(securityProtocol); server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node); createAndCheckClientAuthenticationFailure(securityProtocol, node, "PLAIN",
"Authentication failed: Invalid username or password");
server.verifyAuthenticationMetrics(0, 1); server.verifyAuthenticationMetrics(0, 1);
} }
@ -307,7 +310,7 @@ public class SaslAuthenticatorTest {
String node = "0"; String node = "0";
server = createEchoServer(securityProtocol); server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createAndCheckClientConnectionFailure(securityProtocol, node); createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
server.verifyAuthenticationMetrics(0, 1); server.verifyAuthenticationMetrics(0, 1);
} }
@ -326,7 +329,7 @@ public class SaslAuthenticatorTest {
String node = "0"; String node = "0";
server = createEchoServer(securityProtocol); server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createAndCheckClientConnectionFailure(securityProtocol, node); createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
server.verifyAuthenticationMetrics(0, 1); server.verifyAuthenticationMetrics(0, 1);
} }
@ -344,7 +347,7 @@ public class SaslAuthenticatorTest {
server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove(TestJaasConfig.USERNAME); server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove(TestJaasConfig.USERNAME);
String node = "1"; String node = "1";
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256"); saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
createAndCheckClientConnectionFailure(securityProtocol, node); createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
server.verifyAuthenticationMetrics(0, 1); server.verifyAuthenticationMetrics(0, 1);
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
@ -1136,11 +1139,24 @@ public class SaslAuthenticatorTest {
selector = null; selector = null;
} }
private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node) throws Exception { private void createAndCheckClientAuthenticationFailure(SecurityProtocol securityProtocol, String node,
String mechanism, String expectedErrorMessage) throws Exception {
ChannelState finalState = createAndCheckClientConnectionFailure(securityProtocol, node);
Exception exception = finalState.exception();
assertTrue("Invalid exception class " + exception.getClass(), exception instanceof SaslAuthenticationException);
if (expectedErrorMessage == null)
expectedErrorMessage = "Authentication failed due to invalid credentials with SASL mechanism " + mechanism;
assertEquals(expectedErrorMessage, exception.getMessage());
}
private ChannelState createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node)
throws Exception {
createClientConnection(securityProtocol, node); createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); ChannelState finalState = NetworkTestUtils.waitForChannelClose(selector, node,
ChannelState.State.AUTHENTICATION_FAILED);
selector.close(); selector.close();
selector = null; selector = null;
return finalState;
} }
private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException { private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException {

View File

@ -23,10 +23,9 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import javax.security.sasl.SaslException;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.authenticator.TestJaasConfig; import org.apache.kafka.common.security.authenticator.TestJaasConfig;
@ -62,7 +61,7 @@ public class PlainSaslServerTest {
assertEquals(0, nextChallenge.length); assertEquals(0, nextChallenge.length);
} }
@Test(expected = SaslException.class) @Test(expected = SaslAuthenticationException.class)
public void authorizatonIdNotEqualsAuthenticationId() throws Exception { public void authorizatonIdNotEqualsAuthenticationId() throws Exception {
saslServer.evaluateResponse(saslMessage(USER_B, USER_A, PASSWORD_A)); saslServer.evaluateResponse(saslMessage(USER_B, USER_A, PASSWORD_A));
} }

View File

@ -22,10 +22,9 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import javax.security.sasl.SaslException;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.authenticator.CredentialCache; import org.apache.kafka.common.security.authenticator.CredentialCache;
public class ScramSaslServerTest { public class ScramSaslServerTest {
@ -60,7 +59,7 @@ public class ScramSaslServerTest {
assertTrue("Next challenge is empty", nextChallenge.length > 0); assertTrue("Next challenge is empty", nextChallenge.length > 0);
} }
@Test(expected = SaslException.class) @Test(expected = SaslAuthenticationException.class)
public void authorizatonIdNotEqualsAuthenticationId() throws Exception { public void authorizatonIdNotEqualsAuthenticationId() throws Exception {
saslServer.evaluateResponse(clientFirstMessage(USER_A, USER_B)); saslServer.evaluateResponse(clientFirstMessage(USER_A, USER_B));
} }

View File

@ -70,6 +70,9 @@
<li>SimpleAclAuthorizer now logs access denials to the authorizer log by default.</li> <li>SimpleAclAuthorizer now logs access denials to the authorizer log by default.</li>
<li>Authentication failures are now reported to clients as one of the subclasses of <code>AuthenticationException</code>. <li>Authentication failures are now reported to clients as one of the subclasses of <code>AuthenticationException</code>.
No retries will be performed if a client connection fails authentication.</li> No retries will be performed if a client connection fails authentication.</li>
<li>Custom <code>SaslServer</code> implementations may throw <code>SaslAuthenticationException</code> to provide an error
message to return to clients indicating the reason for authentication failure. Implementors should take care not to include
any security-critical information in the exception message that should not be leaked to unauthenticated clients.</li>
<li>The <code>app-info</code> mbean registered with JMX to provide version and commit id will be deprecated and replaced with <li>The <code>app-info</code> mbean registered with JMX to provide version and commit id will be deprecated and replaced with
metrics providing these attributes.</li> metrics providing these attributes.</li>
<li>Kafka metrics may now contain non-numeric values. <code>org.apache.kafka.common.Metric#value()</code> has been deprecated and <li>Kafka metrics may now contain non-numeric values. <code>org.apache.kafka.common.Metric#value()</code> has been deprecated and