KAFKA-7972: Use automatic RPC generation in SaslHandshake

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>

Closes #6301 from mimaison/sasl-handshake
This commit is contained in:
Mickael Maison 2019-02-25 11:20:07 +05:30 committed by Manikumar Reddy
parent 12e8b8c2c7
commit 4824dc994d
12 changed files with 86 additions and 116 deletions

View File

@ -102,6 +102,7 @@
<allow pkg="org.apache.kafka.common.protocol" /> <allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.errors" /> <allow pkg="org.apache.kafka.common.errors" />
<subpackage name="authenticator"> <subpackage name="authenticator">
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.protocol.types" /> <allow pkg="org.apache.kafka.common.protocol.types" />
<allow pkg="org.apache.kafka.common.requests" /> <allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.clients" /> <allow pkg="org.apache.kafka.clients" />

View File

@ -22,6 +22,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Struct;
@ -97,8 +99,6 @@ import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
import org.apache.kafka.common.requests.RenewDelegationTokenResponse; import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
import org.apache.kafka.common.requests.SaslAuthenticateRequest; import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslAuthenticateResponse; import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.requests.StopReplicaRequest; import org.apache.kafka.common.requests.StopReplicaRequest;
import org.apache.kafka.common.requests.StopReplicaResponse; import org.apache.kafka.common.requests.StopReplicaResponse;
import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupRequest;
@ -142,7 +142,7 @@ public enum ApiKeys {
DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequest.schemaVersions(), DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequest.schemaVersions(),
DescribeGroupsResponse.schemaVersions()), DescribeGroupsResponse.schemaVersions()),
LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), ListGroupsResponse.schemaVersions()), LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), ListGroupsResponse.schemaVersions()),
SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequest.schemaVersions(), SaslHandshakeResponse.schemaVersions()), SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, SaslHandshakeResponseData.SCHEMAS),
API_VERSIONS(18, "ApiVersions", ApiVersionsRequest.schemaVersions(), ApiVersionsResponse.schemaVersions()) { API_VERSIONS(18, "ApiVersions", ApiVersionsRequest.schemaVersions(), ApiVersionsResponse.schemaVersions()) {
@Override @Override
public Struct parseResponse(short version, ByteBuffer buffer) { public Struct parseResponse(short version, ByteBuffer buffer) {

View File

@ -105,7 +105,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case LIST_GROUPS: case LIST_GROUPS:
return new ListGroupsResponse(struct); return new ListGroupsResponse(struct);
case SASL_HANDSHAKE: case SASL_HANDSHAKE:
return new SaslHandshakeResponse(struct); return new SaslHandshakeResponse(struct, version);
case API_VERSIONS: case API_VERSIONS:
return new ApiVersionsResponse(struct); return new ApiVersionsResponse(struct);
case CREATE_TOPICS: case CREATE_TOPICS:

View File

@ -16,17 +16,13 @@
*/ */
package org.apache.kafka.common.requests; package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import static org.apache.kafka.common.protocol.types.Type.STRING;
/** /**
* Request from SASL client containing client SASL mechanism. * Request from SASL client containing client SASL mechanism.
@ -38,73 +34,54 @@ import static org.apache.kafka.common.protocol.types.Type.STRING;
* making it easy to distinguish from a GSSAPI packet. * making it easy to distinguish from a GSSAPI packet.
*/ */
public class SaslHandshakeRequest extends AbstractRequest { public class SaslHandshakeRequest extends AbstractRequest {
private static final String MECHANISM_KEY_NAME = "mechanism";
private static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema(
new Field("mechanism", STRING, "SASL Mechanism chosen by the client."));
// SASL_HANDSHAKE_REQUEST_V1 added to support SASL_AUTHENTICATE request to improve diagnostics
private static final Schema SASL_HANDSHAKE_REQUEST_V1 = SASL_HANDSHAKE_REQUEST_V0;
public static Schema[] schemaVersions() {
return new Schema[]{SASL_HANDSHAKE_REQUEST_V0, SASL_HANDSHAKE_REQUEST_V1};
}
private final String mechanism;
public static class Builder extends AbstractRequest.Builder<SaslHandshakeRequest> { public static class Builder extends AbstractRequest.Builder<SaslHandshakeRequest> {
private final String mechanism; private final SaslHandshakeRequestData data;
public Builder(String mechanism) { public Builder(SaslHandshakeRequestData data) {
super(ApiKeys.SASL_HANDSHAKE); super(ApiKeys.SASL_HANDSHAKE);
this.mechanism = mechanism; this.data = data;
} }
@Override @Override
public SaslHandshakeRequest build(short version) { public SaslHandshakeRequest build(short version) {
return new SaslHandshakeRequest(mechanism, version); return new SaslHandshakeRequest(data, version);
} }
@Override @Override
public String toString() { public String toString() {
StringBuilder bld = new StringBuilder(); return data.toString();
bld.append("(type=SaslHandshakeRequest").
append(", mechanism=").append(mechanism).
append(")");
return bld.toString();
} }
} }
public SaslHandshakeRequest(String mechanism) { private final SaslHandshakeRequestData data;
this(mechanism, ApiKeys.SASL_HANDSHAKE.latestVersion()); private final short version;
public SaslHandshakeRequest(SaslHandshakeRequestData data) {
this(data, ApiKeys.SASL_HANDSHAKE.latestVersion());
} }
public SaslHandshakeRequest(String mechanism, short version) { public SaslHandshakeRequest(SaslHandshakeRequestData data, short version) {
super(ApiKeys.SASL_HANDSHAKE, version); super(ApiKeys.SASL_HANDSHAKE, version);
this.mechanism = mechanism; this.data = data;
this.version = version;
} }
public SaslHandshakeRequest(Struct struct, short version) { public SaslHandshakeRequest(Struct struct, short version) {
super(ApiKeys.SASL_HANDSHAKE, version); super(ApiKeys.SASL_HANDSHAKE, version);
mechanism = struct.getString(MECHANISM_KEY_NAME); this.data = new SaslHandshakeRequestData(struct, version);
this.version = version;
} }
public String mechanism() { public SaslHandshakeRequestData data() {
return mechanism; return data;
} }
@Override @Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
short versionId = version(); SaslHandshakeResponseData response = new SaslHandshakeResponseData();
switch (versionId) { response.setErrorCode(ApiError.fromThrowable(e).error().code());
case 0: return new SaslHandshakeResponse(response);
case 1:
List<String> enabledMechanisms = Collections.emptyList();
return new SaslHandshakeResponse(Errors.forException(e), enabledMechanisms);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.SASL_HANDSHAKE.latestVersion()));
}
} }
public static SaslHandshakeRequest parse(ByteBuffer buffer, short version) { public static SaslHandshakeRequest parse(ByteBuffer buffer, short version) {
@ -113,9 +90,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
@Override @Override
protected Struct toStruct() { protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.SASL_HANDSHAKE.requestSchema(version())); return data.toStruct(version);
struct.set(MECHANISM_KEY_NAME, mechanism);
return struct;
} }
} }

View File

@ -16,84 +16,56 @@
*/ */
package org.apache.kafka.common.requests; package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.Collections;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
/** /**
* Response from SASL server which indicates if the client-chosen mechanism is enabled in the server. * Response from SASL server which indicates if the client-chosen mechanism is enabled in the server.
* For error responses, the list of enabled mechanisms is included in the response. * For error responses, the list of enabled mechanisms is included in the response.
*/ */
public class SaslHandshakeResponse extends AbstractResponse { public class SaslHandshakeResponse extends AbstractResponse {
private static final String ENABLED_MECHANISMS_KEY_NAME = "enabled_mechanisms";
private static final Schema SASL_HANDSHAKE_RESPONSE_V0 = new Schema( private final SaslHandshakeResponseData data;
ERROR_CODE,
new Field(ENABLED_MECHANISMS_KEY_NAME, new ArrayOf(Type.STRING), "Array of mechanisms enabled in the server."));
private static final Schema SASL_HANDSHAKE_RESPONSE_V1 = SASL_HANDSHAKE_RESPONSE_V0; public SaslHandshakeResponse(SaslHandshakeResponseData data) {
this.data = data;
public static Schema[] schemaVersions() {
return new Schema[]{SASL_HANDSHAKE_RESPONSE_V0, SASL_HANDSHAKE_RESPONSE_V1};
} }
/** public SaslHandshakeResponse(Struct struct, short version) {
* Possible error codes: this.data = new SaslHandshakeResponseData(struct, version);
* UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server
* ILLEGAL_SASL_STATE(34) : Invalid request during SASL handshake
*/
private final Errors error;
private final List<String> enabledMechanisms;
public SaslHandshakeResponse(Errors error, Collection<String> enabledMechanisms) {
this.error = error;
this.enabledMechanisms = new ArrayList<>(enabledMechanisms);
}
public SaslHandshakeResponse(Struct struct) {
error = Errors.forCode(struct.get(ERROR_CODE));
Object[] mechanisms = struct.getArray(ENABLED_MECHANISMS_KEY_NAME);
ArrayList<String> enabledMechanisms = new ArrayList<>();
for (Object mechanism : mechanisms)
enabledMechanisms.add((String) mechanism);
this.enabledMechanisms = enabledMechanisms;
} }
/*
* Possible error codes:
* UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server
* ILLEGAL_SASL_STATE(34) : Invalid request during SASL handshake
*/
public Errors error() { public Errors error() {
return error; return Errors.forCode(data.errorCode());
} }
@Override @Override
public Map<Errors, Integer> errorCounts() { public Map<Errors, Integer> errorCounts() {
return errorCounts(error); return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
} }
@Override @Override
public Struct toStruct(short version) { public Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.SASL_HANDSHAKE.responseSchema(version)); return data.toStruct(version);
struct.set(ERROR_CODE, error.code());
struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray());
return struct;
} }
public List<String> enabledMechanisms() { public List<String> enabledMechanisms() {
return enabledMechanisms; return data.mechanisms();
} }
public static SaslHandshakeResponse parse(ByteBuffer buffer, short version) { public static SaslHandshakeResponse parse(ByteBuffer buffer, short version) {
return new SaslHandshakeResponse(ApiKeys.SASL_HANDSHAKE.parseResponse(version, buffer)); return new SaslHandshakeResponse(ApiKeys.SASL_HANDSHAKE.parseResponse(version, buffer), version);
} }
} }

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.ReauthenticationContext; import org.apache.kafka.common.network.ReauthenticationContext;
@ -328,7 +329,8 @@ public class SaslClientAuthenticator implements Authenticator {
// Visible to override for testing // Visible to override for testing
protected SaslHandshakeRequest createSaslHandshakeRequest(short version) { protected SaslHandshakeRequest createSaslHandshakeRequest(short version) {
return new SaslHandshakeRequest.Builder(mechanism).build(version); return new SaslHandshakeRequest.Builder(
new SaslHandshakeRequestData().setMechanism(mechanism)).build(version);
} }
// Visible to override for testing // Visible to override for testing

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.ListenerName;
@ -35,7 +36,6 @@ import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.ApiVersionsResponse;
@ -50,6 +50,7 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext; import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.kerberos.KerberosError; import org.apache.kafka.common.security.kerberos.KerberosError;
import org.apache.kafka.common.security.kerberos.KerberosName; import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
@ -77,12 +78,12 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.security.PrivilegedActionException; import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
public class SaslServerAuthenticator implements Authenticator { public class SaslServerAuthenticator implements Authenticator {
// GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
@ -118,7 +119,7 @@ public class SaslServerAuthenticator implements Authenticator {
private final String connectionId; private final String connectionId;
private final Map<String, Subject> subjects; private final Map<String, Subject> subjects;
private final TransportLayer transportLayer; private final TransportLayer transportLayer;
private final Set<String> enabledMechanisms; private final List<String> enabledMechanisms;
private final Map<String, ?> configs; private final Map<String, ?> configs;
private final KafkaPrincipalBuilder principalBuilder; private final KafkaPrincipalBuilder principalBuilder;
private final Map<String, AuthenticateCallbackHandler> callbackHandlers; private final Map<String, AuthenticateCallbackHandler> callbackHandlers;
@ -168,8 +169,8 @@ public class SaslServerAuthenticator implements Authenticator {
List<String> enabledMechanisms = (List<String>) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG); List<String> enabledMechanisms = (List<String>) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
if (enabledMechanisms == null || enabledMechanisms.isEmpty()) if (enabledMechanisms == null || enabledMechanisms.isEmpty())
throw new IllegalArgumentException("No SASL mechanisms are enabled"); throw new IllegalArgumentException("No SASL mechanisms are enabled");
this.enabledMechanisms = new HashSet<>(enabledMechanisms); this.enabledMechanisms = new ArrayList<String>(new HashSet<String>(enabledMechanisms));
for (String mechanism : enabledMechanisms) { for (String mechanism : this.enabledMechanisms) {
if (!callbackHandlers.containsKey(mechanism)) if (!callbackHandlers.containsKey(mechanism))
throw new IllegalArgumentException("Callback handler not specified for SASL mechanism " + mechanism); throw new IllegalArgumentException("Callback handler not specified for SASL mechanism " + mechanism);
if (!subjects.containsKey(mechanism)) if (!subjects.containsKey(mechanism))
@ -538,17 +539,19 @@ public class SaslServerAuthenticator implements Authenticator {
} }
private String handleHandshakeRequest(RequestContext context, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException { private String handleHandshakeRequest(RequestContext context, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException {
String clientMechanism = handshakeRequest.mechanism(); String clientMechanism = handshakeRequest.data().mechanism();
short version = context.header.apiVersion(); short version = context.header.apiVersion();
if (version >= 1) if (version >= 1)
this.enableKafkaSaslAuthenticateHeaders(true); this.enableKafkaSaslAuthenticateHeaders(true);
if (enabledMechanisms.contains(clientMechanism)) { if (enabledMechanisms.contains(clientMechanism)) {
LOG.debug("Using SASL mechanism '{}' provided by client", clientMechanism); LOG.debug("Using SASL mechanism '{}' provided by client", clientMechanism);
sendKafkaResponse(context, new SaslHandshakeResponse(Errors.NONE, enabledMechanisms)); sendKafkaResponse(context, new SaslHandshakeResponse(
new SaslHandshakeResponseData().setErrorCode(Errors.NONE.code()).setMechanisms(enabledMechanisms)));
return clientMechanism; return clientMechanism;
} else { } else {
LOG.debug("SASL mechanism '{}' requested by client is not supported", clientMechanism); LOG.debug("SASL mechanism '{}' requested by client is not supported", clientMechanism);
buildResponseOnAuthenticateFailure(context, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms)); buildResponseOnAuthenticateFailure(context, new SaslHandshakeResponse(
new SaslHandshakeResponseData().setErrorCode(Errors.UNSUPPORTED_SASL_MECHANISM.code()).setMechanisms(enabledMechanisms)));
throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + clientMechanism); throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + clientMechanism);
} }
} }

View File

@ -45,6 +45,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.Partiti
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Send; import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
@ -1011,11 +1013,14 @@ public class RequestResponseTest {
} }
private SaslHandshakeRequest createSaslHandshakeRequest() { private SaslHandshakeRequest createSaslHandshakeRequest() {
return new SaslHandshakeRequest("PLAIN"); return new SaslHandshakeRequest.Builder(
new SaslHandshakeRequestData().setMechanism("PLAIN")).build();
} }
private SaslHandshakeResponse createSaslHandshakeResponse() { private SaslHandshakeResponse createSaslHandshakeResponse() {
return new SaslHandshakeResponse(Errors.NONE, singletonList("GSSAPI")); return new SaslHandshakeResponse(
new SaslHandshakeResponseData()
.setErrorCode(Errors.NONE.code()).setMechanisms(singletonList("GSSAPI")));
} }
private SaslAuthenticateRequest createSaslAuthenticateRequest() { private SaslAuthenticateRequest createSaslAuthenticateRequest() {

View File

@ -53,6 +53,7 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.network.CertStores; import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.network.ChannelBuilders;
@ -708,8 +709,9 @@ public class SaslAuthenticatorTest {
// Send SaslHandshakeRequest and validate that connection is closed by server. // Send SaslHandshakeRequest and validate that connection is closed by server.
String node1 = "invalid1"; String node1 = "invalid1";
createClientConnection(SecurityProtocol.PLAINTEXT, node1); createClientConnection(SecurityProtocol.PLAINTEXT, node1);
SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN"); SaslHandshakeRequest request = buildSaslHandshakeRequest("PLAIN", ApiKeys.SASL_HANDSHAKE.latestVersion());
RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, Short.MAX_VALUE, "someclient", 2); RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, Short.MAX_VALUE, "someclient", 2);
selector.send(request.toSend(node1, header)); selector.send(request.toSend(node1, header));
// This test uses a non-SASL PLAINTEXT client in order to do manual handshake. // This test uses a non-SASL PLAINTEXT client in order to do manual handshake.
// So the channel is in READY state. // So the channel is in READY state.
@ -1715,7 +1717,7 @@ public class SaslAuthenticatorTest {
servicePrincipal, serverHost, saslMechanism, true, transportLayer, time) { servicePrincipal, serverHost, saslMechanism, true, transportLayer, time) {
@Override @Override
protected SaslHandshakeRequest createSaslHandshakeRequest(short version) { protected SaslHandshakeRequest createSaslHandshakeRequest(short version) {
return new SaslHandshakeRequest.Builder(saslMechanism).build((short) 0); return buildSaslHandshakeRequest(saslMechanism, (short) 0);
} }
@Override @Override
protected void saslAuthenticateVersion(ApiVersionsResponse apiVersionsResponse) { protected void saslAuthenticateVersion(ApiVersionsResponse apiVersionsResponse) {
@ -1927,7 +1929,7 @@ public class SaslAuthenticatorTest {
} }
private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node, short version) throws Exception { private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node, short version) throws Exception {
SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest.Builder("PLAIN").build(version); SaslHandshakeRequest handshakeRequest = buildSaslHandshakeRequest("PLAIN", version);
SaslHandshakeResponse response = (SaslHandshakeResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest); SaslHandshakeResponse response = (SaslHandshakeResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest);
assertEquals(Errors.NONE, response.error()); assertEquals(Errors.NONE, response.error());
return response; return response;
@ -1971,6 +1973,11 @@ public class SaslAuthenticatorTest {
} }
} }
private SaslHandshakeRequest buildSaslHandshakeRequest(String mechanism, short version) {
return new SaslHandshakeRequest.Builder(
new SaslHandshakeRequestData().setMechanism(mechanism)).build(version);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void updateScramCredentialCache(String username, String password) throws NoSuchAlgorithmException { private void updateScramCredentialCache(String username, String password) throws NoSuchAlgorithmException {
for (String mechanism : (List<String>) saslServerConfigs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG)) { for (String mechanism : (List<String>) saslServerConfigs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG)) {
@ -2228,7 +2235,8 @@ public class SaslAuthenticatorTest {
"PLAIN", true, transportLayer, time) { "PLAIN", true, transportLayer, time) {
@Override @Override
protected SaslHandshakeRequest createSaslHandshakeRequest(short version) { protected SaslHandshakeRequest createSaslHandshakeRequest(short version) {
return new SaslHandshakeRequest.Builder("PLAIN").build(version); return new SaslHandshakeRequest.Builder(
new SaslHandshakeRequestData().setMechanism("PLAIN")).build(version);
} }
}; };
} }

View File

@ -48,6 +48,7 @@ import org.apache.kafka.common.message.CreateTopicsResponseData
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet} import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
import org.apache.kafka.common.message.LeaveGroupResponseData import org.apache.kafka.common.message.LeaveGroupResponseData
import org.apache.kafka.common.message.SaslHandshakeResponseData
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -1359,7 +1360,8 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
def handleSaslHandshakeRequest(request: RequestChannel.Request) { def handleSaslHandshakeRequest(request: RequestChannel.Request) {
sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, Collections.emptySet())) val responseData = new SaslHandshakeResponseData().setErrorCode(Errors.ILLEGAL_SASL_STATE.code())
sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(responseData))
} }
def handleSaslAuthenticateRequest(request: RequestChannel.Request) { def handleSaslAuthenticateRequest(request: RequestChannel.Request) {

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceP
import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet} import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.protocol.ApiKeys
@ -273,7 +274,7 @@ class RequestQuotaTest extends BaseRequestTest {
new ListGroupsRequest.Builder() new ListGroupsRequest.Builder()
case ApiKeys.SASL_HANDSHAKE => case ApiKeys.SASL_HANDSHAKE =>
new SaslHandshakeRequest.Builder("PLAIN") new SaslHandshakeRequest.Builder(new SaslHandshakeRequestData().setMechanism("PLAIN"))
case ApiKeys.SASL_AUTHENTICATE => case ApiKeys.SASL_AUTHENTICATE =>
new SaslAuthenticateRequest.Builder(ByteBuffer.wrap(new Array[Byte](0))) new SaslAuthenticateRequest.Builder(ByteBuffer.wrap(new Array[Byte](0)))

View File

@ -19,6 +19,7 @@ package kafka.server
import java.net.Socket import java.net.Socket
import java.util.Collections import java.util.Collections
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
import org.apache.kafka.common.requests.SaslHandshakeRequest import org.apache.kafka.common.requests.SaslHandshakeRequest
@ -95,7 +96,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslSetup {
} }
private def sendSaslHandshakeRequestValidateResponse(socket: Socket) { private def sendSaslHandshakeRequestValidateResponse(socket: Socket) {
val request = new SaslHandshakeRequest("PLAIN") val request = new SaslHandshakeRequest(new SaslHandshakeRequestData().setMechanism("PLAIN"))
val response = sendAndReceive(request, ApiKeys.SASL_HANDSHAKE, socket) val response = sendAndReceive(request, ApiKeys.SASL_HANDSHAKE, socket)
val handshakeResponse = SaslHandshakeResponse.parse(response, request.version) val handshakeResponse = SaslHandshakeResponse.parse(response, request.version)
assertEquals(Errors.NONE, handshakeResponse.error) assertEquals(Errors.NONE, handshakeResponse.error)