mirror of https://github.com/apache/kafka.git
KAFKA-18339: Remove raw unversioned direct SASL protocol (KIP-896) (#18295)
Clients that support SASL but don't implement KIP-43 (eg Kafka producer/consumer 0.9.0.x) will fail to connect after this change. Added unit tests and also manually tested with the console producer 0.9.0. While testing, I noticed that the logged message when a 0.9.0 Java client is used without sasl is slightly misleading - fixed that too. Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
parent
e6d2421136
commit
875da35ec3
|
@ -106,7 +106,7 @@ public class SaslServerAuthenticator implements Authenticator {
|
||||||
* state and likewise ends at either {@link #COMPLETE} or {@link #FAILED}.
|
* state and likewise ends at either {@link #COMPLETE} or {@link #FAILED}.
|
||||||
*/
|
*/
|
||||||
private enum SaslState {
|
private enum SaslState {
|
||||||
INITIAL_REQUEST, // May be GSSAPI token, SaslHandshake or ApiVersions for authentication
|
INITIAL_REQUEST, // May be SaslHandshake or ApiVersions for authentication
|
||||||
HANDSHAKE_OR_VERSIONS_REQUEST, // May be SaslHandshake or ApiVersions
|
HANDSHAKE_OR_VERSIONS_REQUEST, // May be SaslHandshake or ApiVersions
|
||||||
HANDSHAKE_REQUEST, // After an ApiVersions request, next request must be SaslHandshake
|
HANDSHAKE_REQUEST, // After an ApiVersions request, next request must be SaslHandshake
|
||||||
AUTHENTICATE, // Authentication tokens (SaslHandshake v1 and above indicate SaslAuthenticate headers)
|
AUTHENTICATE, // Authentication tokens (SaslHandshake v1 and above indicate SaslAuthenticate headers)
|
||||||
|
@ -277,15 +277,11 @@ public class SaslServerAuthenticator implements Authenticator {
|
||||||
case REAUTH_PROCESS_HANDSHAKE:
|
case REAUTH_PROCESS_HANDSHAKE:
|
||||||
case HANDSHAKE_OR_VERSIONS_REQUEST:
|
case HANDSHAKE_OR_VERSIONS_REQUEST:
|
||||||
case HANDSHAKE_REQUEST:
|
case HANDSHAKE_REQUEST:
|
||||||
|
case INITIAL_REQUEST:
|
||||||
handleKafkaRequest(clientToken);
|
handleKafkaRequest(clientToken);
|
||||||
break;
|
break;
|
||||||
case REAUTH_BAD_MECHANISM:
|
case REAUTH_BAD_MECHANISM:
|
||||||
throw new SaslAuthenticationException(reauthInfo.badMechanismErrorMessage);
|
throw new SaslAuthenticationException(reauthInfo.badMechanismErrorMessage);
|
||||||
case INITIAL_REQUEST:
|
|
||||||
if (handleKafkaRequest(clientToken))
|
|
||||||
break;
|
|
||||||
// For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet.
|
|
||||||
// This is required for interoperability with 0.9.0.x clients which do not send handshake request
|
|
||||||
case AUTHENTICATE:
|
case AUTHENTICATE:
|
||||||
handleSaslToken(clientToken);
|
handleSaslToken(clientToken);
|
||||||
// When the authentication exchange is complete and no more tokens are expected from the client,
|
// When the authentication exchange is complete and no more tokens are expected from the client,
|
||||||
|
@ -503,63 +499,51 @@ public class SaslServerAuthenticator implements Authenticator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
|
/**
|
||||||
boolean isKafkaRequest = false;
|
* @throws InvalidRequestException if the request is not in Kafka format or if the API key is invalid. Clients
|
||||||
String clientMechanism = null;
|
* that support SASL without support for KIP-43 (e.g. Kafka Clients 0.9.x) are in the former bucket - the first
|
||||||
|
* packet such clients send is a GSSAPI token starting with 0x60.
|
||||||
|
*/
|
||||||
|
private void handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
|
||||||
try {
|
try {
|
||||||
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
|
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
|
||||||
RequestHeader header = RequestHeader.parse(requestBuffer);
|
RequestHeader header = RequestHeader.parse(requestBuffer);
|
||||||
ApiKeys apiKey = header.apiKey();
|
ApiKeys apiKey = header.apiKey();
|
||||||
|
|
||||||
// A valid Kafka request header was received. SASL authentication tokens are now expected only
|
|
||||||
// following a SaslHandshakeRequest since this is not a GSSAPI client token from a Kafka 0.9.0.x client.
|
|
||||||
if (saslState == SaslState.INITIAL_REQUEST)
|
|
||||||
setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
|
|
||||||
isKafkaRequest = true;
|
|
||||||
|
|
||||||
// Raise an error prior to parsing if the api cannot be handled at this layer. This avoids
|
// Raise an error prior to parsing if the api cannot be handled at this layer. This avoids
|
||||||
// unnecessary exposure to some of the more complex schema types.
|
// unnecessary exposure to some of the more complex schema types.
|
||||||
if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE)
|
if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE)
|
||||||
throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
|
throw new InvalidRequestException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
|
||||||
|
|
||||||
LOG.debug("Handling Kafka request {} during {}", apiKey, reauthInfo.authenticationOrReauthenticationText());
|
LOG.debug("Handling Kafka request {} during {}", apiKey, reauthInfo.authenticationOrReauthenticationText());
|
||||||
|
|
||||||
|
|
||||||
RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), Optional.of(clientPort()),
|
RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), Optional.of(clientPort()),
|
||||||
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
|
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
|
||||||
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
|
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
|
||||||
|
|
||||||
|
// A valid Kafka request was received, we can now update the sasl state
|
||||||
|
if (saslState == SaslState.INITIAL_REQUEST)
|
||||||
|
setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
|
||||||
|
|
||||||
if (apiKey == ApiKeys.API_VERSIONS)
|
if (apiKey == ApiKeys.API_VERSIONS)
|
||||||
handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request);
|
handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request);
|
||||||
else
|
else {
|
||||||
clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request);
|
String clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request);
|
||||||
} catch (InvalidRequestException e) {
|
if (!reauthInfo.reauthenticating() || reauthInfo.saslMechanismUnchanged(clientMechanism)) {
|
||||||
if (saslState == SaslState.INITIAL_REQUEST) {
|
|
||||||
// InvalidRequestException is thrown if the request is not in Kafka format or if the API key
|
|
||||||
// is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
|
|
||||||
// starting with 0x60, revert to GSSAPI for both these exceptions.
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
StringBuilder tokenBuilder = new StringBuilder();
|
|
||||||
for (byte b : requestBytes) {
|
|
||||||
tokenBuilder.append(String.format("%02x", b));
|
|
||||||
if (tokenBuilder.length() >= 20)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
|
|
||||||
}
|
|
||||||
if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
|
|
||||||
LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
|
|
||||||
clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
|
|
||||||
} else
|
|
||||||
throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
|
|
||||||
} else
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
if (clientMechanism != null && (!reauthInfo.reauthenticating()
|
|
||||||
|| reauthInfo.saslMechanismUnchanged(clientMechanism))) {
|
|
||||||
createSaslServer(clientMechanism);
|
createSaslServer(clientMechanism);
|
||||||
setSaslState(SaslState.AUTHENTICATE);
|
setSaslState(SaslState.AUTHENTICATE);
|
||||||
}
|
}
|
||||||
return isKafkaRequest;
|
}
|
||||||
|
} catch (InvalidRequestException e) {
|
||||||
|
if (saslState == SaslState.INITIAL_REQUEST) {
|
||||||
|
// InvalidRequestException is thrown if the request is not in Kafka format or if the API key is invalid.
|
||||||
|
// If it's the initial request, this could be an ancient client (see method documentation for more details),
|
||||||
|
// a client configured with the wrong security protocol or a non kafka-client altogether (eg http client).
|
||||||
|
throw new InvalidRequestException("Invalid request, potential reasons: kafka client configured with the " +
|
||||||
|
"wrong security protocol, it does not support KIP-43 or it is not a kafka client.", e);
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String handleHandshakeRequest(RequestContext context, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException {
|
private String handleHandshakeRequest(RequestContext context, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException {
|
||||||
|
|
|
@ -17,9 +17,10 @@
|
||||||
package org.apache.kafka.common.security.authenticator;
|
package org.apache.kafka.common.security.authenticator;
|
||||||
|
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
||||||
import org.apache.kafka.common.errors.IllegalSaslStateException;
|
import org.apache.kafka.common.errors.InvalidRequestException;
|
||||||
import org.apache.kafka.common.errors.SaslAuthenticationException;
|
import org.apache.kafka.common.errors.SaslAuthenticationException;
|
||||||
import org.apache.kafka.common.message.ApiMessageType;
|
import org.apache.kafka.common.message.ApiMessageType;
|
||||||
|
import org.apache.kafka.common.message.RequestHeaderData;
|
||||||
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
|
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
|
||||||
import org.apache.kafka.common.message.SaslHandshakeRequestData;
|
import org.apache.kafka.common.message.SaslHandshakeRequestData;
|
||||||
import org.apache.kafka.common.network.ChannelBuilders;
|
import org.apache.kafka.common.network.ChannelBuilders;
|
||||||
|
@ -63,6 +64,7 @@ import java.net.InetAddress;
|
||||||
import java.nio.Buffer;
|
import java.nio.Buffer;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -77,7 +79,6 @@ import javax.security.sasl.SaslServer;
|
||||||
import static org.apache.kafka.common.security.scram.internals.ScramMechanism.SCRAM_SHA_256;
|
import static org.apache.kafka.common.security.scram.internals.ScramMechanism.SCRAM_SHA_256;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.fail;
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyMap;
|
import static org.mockito.ArgumentMatchers.anyMap;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
@ -107,7 +108,7 @@ public class SaslServerAuthenticatorTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnexpectedRequestType() throws IOException {
|
public void testUnexpectedRequestTypeWithValidRequestHeader() throws IOException {
|
||||||
TransportLayer transportLayer = mock(TransportLayer.class);
|
TransportLayer transportLayer = mock(TransportLayer.class);
|
||||||
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
|
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
|
||||||
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
|
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
|
||||||
|
@ -126,13 +127,35 @@ public class SaslServerAuthenticatorTest {
|
||||||
return headerBuffer.remaining();
|
return headerBuffer.remaining();
|
||||||
});
|
});
|
||||||
|
|
||||||
try {
|
assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
|
||||||
authenticator.authenticate();
|
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
|
||||||
fail("Expected authenticate() to raise an exception");
|
|
||||||
} catch (IllegalSaslStateException e) {
|
|
||||||
// expected exception
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvalidRequestHeader() throws IOException {
|
||||||
|
TransportLayer transportLayer = mock(TransportLayer.class);
|
||||||
|
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
|
||||||
|
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
|
||||||
|
SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer,
|
||||||
|
SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry());
|
||||||
|
|
||||||
|
short invalidApiKeyId = (short) (Arrays.stream(ApiKeys.values()).mapToInt(k -> k.id).max().getAsInt() + 1);
|
||||||
|
ByteBuffer headerBuffer = RequestTestUtils.serializeRequestHeader(new RequestHeader(
|
||||||
|
new RequestHeaderData()
|
||||||
|
.setRequestApiKey(invalidApiKeyId)
|
||||||
|
.setRequestApiVersion((short) 0),
|
||||||
|
(short) 2));
|
||||||
|
|
||||||
|
when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
|
||||||
|
invocation.<ByteBuffer>getArgument(0).putInt(headerBuffer.remaining());
|
||||||
|
return 4;
|
||||||
|
}).then(invocation -> {
|
||||||
|
// serialize only the request header. the authenticator should not parse beyond this
|
||||||
|
invocation.<ByteBuffer>getArgument(0).put(headerBuffer.duplicate());
|
||||||
|
return headerBuffer.remaining();
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
|
||||||
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
|
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
|
||||||
import org.apache.kafka.common.message.ApiMessageType.ListenerType
|
import org.apache.kafka.common.message.ApiMessageType.ListenerType
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import org.apache.kafka.common.config.ConfigException
|
import org.apache.kafka.common.config.ConfigException
|
||||||
import org.apache.kafka.common.errors.InvalidRequestException
|
import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
|
||||||
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
|
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
|
||||||
import org.apache.kafka.common.metrics._
|
import org.apache.kafka.common.metrics._
|
||||||
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate}
|
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate}
|
||||||
|
@ -1107,8 +1107,10 @@ private[kafka] class Processor(
|
||||||
val header = RequestHeader.parse(buffer)
|
val header = RequestHeader.parse(buffer)
|
||||||
if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
|
if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
|
||||||
header
|
header
|
||||||
} else {
|
} else if (header.isApiVersionDeprecated()) {
|
||||||
throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled")
|
throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled")
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedVersionException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not supported")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue