KAFKA-8599: Use automatic RPC generation in ExpireDelegationToken

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Viktor Somogyi <viktorsomogyi@gmail.com>

Closes #7098 from mimaison/KAFKA-8599
This commit is contained in:
Mickael Maison 2019-08-07 13:32:26 +05:30 committed by Manikumar Reddy
parent 97b731b086
commit 926fb35d9d
9 changed files with 74 additions and 120 deletions

View File

@ -73,6 +73,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicRe
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
@ -2473,8 +2474,11 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new ExpireDelegationTokenRequest.Builder(hmac, options.expiryTimePeriodMs());
AbstractRequest.Builder<ExpireDelegationTokenRequest> createRequest(int timeoutMs) {
return new ExpireDelegationTokenRequest.Builder(
new ExpireDelegationTokenRequestData()
.setHmac(hmac)
.setExpiryTimePeriodMs(options.expiryTimePeriodMs()));
}
@Override

View File

@ -30,6 +30,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
@ -93,8 +95,6 @@ import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
@ -191,7 +191,7 @@ public enum ApiKeys {
CreatePartitionsResponse.schemaVersions()),
CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS, CreateDelegationTokenResponseData.SCHEMAS),
RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequestData.SCHEMAS, ExpireDelegationTokenResponseData.SCHEMAS),
DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequestData.SCHEMAS, DeleteGroupsResponseData.SCHEMAS),
ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS,

View File

@ -290,6 +290,7 @@ public class Struct {
ByteBuffer buf = (ByteBuffer) result;
byte[] arr = new byte[buf.remaining()];
buf.get(arr);
buf.flip();
return arr;
}

View File

@ -151,7 +151,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case RENEW_DELEGATION_TOKEN:
return new RenewDelegationTokenResponse(struct);
case EXPIRE_DELEGATION_TOKEN:
return new ExpireDelegationTokenResponse(struct);
return new ExpireDelegationTokenResponse(struct, version);
case DESCRIBE_DELEGATION_TOKEN:
return new DescribeDelegationTokenResponse(struct);
case DELETE_GROUPS:

View File

@ -16,102 +16,69 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import static org.apache.kafka.common.protocol.types.Type.BYTES;
import static org.apache.kafka.common.protocol.types.Type.INT64;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
public class ExpireDelegationTokenRequest extends AbstractRequest {
private static final String HMAC_KEY_NAME = "hmac";
private static final String EXPIRY_TIME_PERIOD_KEY_NAME = "expiry_time_period";
private final ByteBuffer hmac;
private final long expiryTimePeriod;
private final ExpireDelegationTokenRequestData data;
private static final Schema TOKEN_EXPIRE_REQUEST_V0 = new Schema(
new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be expired."),
new Field(EXPIRY_TIME_PERIOD_KEY_NAME, INT64, "expiry time period in milli seconds."));
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema TOKEN_EXPIRE_REQUEST_V1 = TOKEN_EXPIRE_REQUEST_V0;
private ExpireDelegationTokenRequest(short version, ByteBuffer hmac, long renewTimePeriod) {
private ExpireDelegationTokenRequest(ExpireDelegationTokenRequestData data, short version) {
super(ApiKeys.EXPIRE_DELEGATION_TOKEN, version);
this.hmac = hmac;
this.expiryTimePeriod = renewTimePeriod;
this.data = data;
}
public ExpireDelegationTokenRequest(Struct struct, short versionId) {
super(ApiKeys.EXPIRE_DELEGATION_TOKEN, versionId);
hmac = struct.getBytes(HMAC_KEY_NAME);
expiryTimePeriod = struct.getLong(EXPIRY_TIME_PERIOD_KEY_NAME);
public ExpireDelegationTokenRequest(Struct struct, short version) {
super(ApiKeys.EXPIRE_DELEGATION_TOKEN, version);
this.data = new ExpireDelegationTokenRequestData(struct, version);
}
public static ExpireDelegationTokenRequest parse(ByteBuffer buffer, short version) {
return new ExpireDelegationTokenRequest(ApiKeys.EXPIRE_DELEGATION_TOKEN.parseRequest(version, buffer), version);
}
public static Schema[] schemaVersions() {
return new Schema[] {TOKEN_EXPIRE_REQUEST_V0, TOKEN_EXPIRE_REQUEST_V1};
}
@Override
protected Struct toStruct() {
short version = version();
Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.requestSchema(version));
struct.set(HMAC_KEY_NAME, hmac);
struct.set(EXPIRY_TIME_PERIOD_KEY_NAME, expiryTimePeriod);
return struct;
return data.toStruct(version());
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new ExpireDelegationTokenResponse(throttleTimeMs, Errors.forException(e));
return new ExpireDelegationTokenResponse(
new ExpireDelegationTokenResponseData()
.setErrorCode(Errors.forException(e).code())
.setThrottleTimeMs(throttleTimeMs));
}
public ByteBuffer hmac() {
return hmac;
return ByteBuffer.wrap(data.hmac());
}
public long expiryTimePeriod() {
return expiryTimePeriod;
return data.expiryTimePeriodMs();
}
public static class Builder extends AbstractRequest.Builder<ExpireDelegationTokenRequest> {
private final ByteBuffer hmac;
private final long expiryTimePeriod;
private final ExpireDelegationTokenRequestData data;
public Builder(byte[] hmac, long expiryTimePeriod) {
public Builder(ExpireDelegationTokenRequestData data) {
super(ApiKeys.EXPIRE_DELEGATION_TOKEN);
this.hmac = ByteBuffer.wrap(hmac);
this.expiryTimePeriod = expiryTimePeriod;
this.data = data;
}
@Override
public ExpireDelegationTokenRequest build(short version) {
return new ExpireDelegationTokenRequest(version, hmac, expiryTimePeriod);
return new ExpireDelegationTokenRequest(data, version);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type: ExpireDelegationTokenRequest").
append(", hmac=").append(hmac).
append(", expiryTimePeriod=").append(expiryTimePeriod).
append(")");
return bld.toString();
return data.toString();
}
}
}

View File

@ -16,92 +16,56 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
import static org.apache.kafka.common.protocol.types.Type.INT64;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
public class ExpireDelegationTokenResponse extends AbstractResponse {
private static final String EXPIRY_TIMESTAMP_KEY_NAME = "expiry_timestamp";
private final ExpireDelegationTokenResponseData data;
private final Errors error;
private final long expiryTimestamp;
private final int throttleTimeMs;
private static final Schema TOKEN_EXPIRE_RESPONSE_V0 = new Schema(
ERROR_CODE,
new Field(EXPIRY_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) at which this token expires.."),
THROTTLE_TIME_MS);
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema TOKEN_EXPIRE_RESPONSE_V1 = TOKEN_EXPIRE_RESPONSE_V0;
public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error, long expiryTimestamp) {
this.throttleTimeMs = throttleTimeMs;
this.error = error;
this.expiryTimestamp = expiryTimestamp;
public ExpireDelegationTokenResponse(ExpireDelegationTokenResponseData data) {
this.data = data;
}
public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error) {
this(throttleTimeMs, error, -1);
}
public ExpireDelegationTokenResponse(Struct struct) {
error = Errors.forCode(struct.get(ERROR_CODE));
this.expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_KEY_NAME);
this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
public ExpireDelegationTokenResponse(Struct struct, short version) {
this.data = new ExpireDelegationTokenResponseData(struct, version);
}
public static ExpireDelegationTokenResponse parse(ByteBuffer buffer, short version) {
return new ExpireDelegationTokenResponse(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version).read(buffer));
}
public static Schema[] schemaVersions() {
return new Schema[] {TOKEN_EXPIRE_RESPONSE_V0, TOKEN_EXPIRE_RESPONSE_V1};
return new ExpireDelegationTokenResponse(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version).read(buffer), version);
}
public Errors error() {
return error;
return Errors.forCode(data.errorCode());
}
public long expiryTimestamp() {
return expiryTimestamp;
return data.expiryTimestampMs();
}
@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(error);
return Collections.singletonMap(error(), 1);
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version));
struct.set(ERROR_CODE, error.code());
struct.set(EXPIRY_TIMESTAMP_KEY_NAME, expiryTimestamp);
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
return struct;
return data.toStruct(version);
}
@Override
public int throttleTimeMs() {
return throttleTimeMs;
return data.throttleTimeMs();
}
public boolean hasError() {
return this.error != Errors.NONE;
return error() != Errors.NONE;
}
@Override

View File

@ -58,6 +58,8 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@ -1551,11 +1553,18 @@ public class RequestResponseTest {
}
private ExpireDelegationTokenRequest createExpireTokenRequest() {
return new ExpireDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build();
ExpireDelegationTokenRequestData data = new ExpireDelegationTokenRequestData()
.setHmac("test".getBytes())
.setExpiryTimePeriodMs(System.currentTimeMillis());
return new ExpireDelegationTokenRequest.Builder(data).build();
}
private ExpireDelegationTokenResponse createExpireTokenResponse() {
return new ExpireDelegationTokenResponse(20, Errors.NONE, System.currentTimeMillis());
ExpireDelegationTokenResponseData data = new ExpireDelegationTokenResponseData()
.setThrottleTimeMs(20)
.setErrorCode(Errors.NONE.code())
.setExpiryTimestampMs(System.currentTimeMillis());
return new ExpireDelegationTokenResponse(data);
}
private DescribeDelegationTokenRequest createDescribeTokenRequest() {

View File

@ -59,6 +59,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicR
import org.apache.kafka.common.message.DescribeGroupsResponseData
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData
import org.apache.kafka.common.message.FindCoordinatorResponseData
import org.apache.kafka.common.message.HeartbeatResponseData
import org.apache.kafka.common.message.InitProducerIdResponseData
@ -2484,7 +2485,11 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Sending expire token response for correlation id %d to client %s."
.format(request.header.correlationId, request.header.clientId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
new ExpireDelegationTokenResponse(requestThrottleMs, error, expiryTimestamp))
new ExpireDelegationTokenResponse(
new ExpireDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(error.code)
.setExpiryTimestampMs(expiryTimestamp)))
}
if (!allowTokenRequests(request))

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
import org.apache.kafka.common.message.DeleteGroupsRequestData
import org.apache.kafka.common.message.DeleteTopicsRequestData
import org.apache.kafka.common.message.DescribeGroupsRequestData
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData
import org.apache.kafka.common.message.FindCoordinatorRequestData
import org.apache.kafka.common.message.HeartbeatRequestData
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
@ -444,7 +445,10 @@ class RequestQuotaTest extends BaseRequestTest {
)
case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
new ExpireDelegationTokenRequest.Builder("".getBytes, 1000)
new ExpireDelegationTokenRequest.Builder(
new ExpireDelegationTokenRequestData()
.setHmac("".getBytes)
.setExpiryTimePeriodMs(1000L))
case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
@ -573,7 +577,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs
case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response, ApiKeys.CREATE_DELEGATION_TOKEN.latestVersion).throttleTimeMs
case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response, ApiKeys.EXPIRE_DELEGATION_TOKEN.latestVersion).throttleTimeMs
case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs