MINOR: Use generated InitProducerId RPC (#6538)

This patch updates the InitProducerId request API to use the generated sources. It also fixes a small bug in the DescribeAclsRequest class where we were using the wrong api key.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Colin McCabe <cmccabe@apache.org>
This commit is contained in:
Jason Gustafson 2019-04-11 08:27:08 -07:00 committed by GitHub
parent 9f5a69a4c2
commit 53e95ffcdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 161 additions and 211 deletions

View File

@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
@ -480,7 +481,10 @@ public class Sender implements Runnable {
private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException {
String nodeId = node.idString();
InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(null)
.setTransactionTimeoutMs(Integer.MAX_VALUE);
InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(requestData);
ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, requestTimeoutMs, null);
return NetworkClientUtils.sendAndReceive(client, request, time);
}
@ -504,7 +508,7 @@ public class Sender implements Runnable {
Errors error = initProducerIdResponse.error();
if (error == Errors.NONE) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
initProducerIdResponse.data.producerId(), initProducerIdResponse.data.producerEpoch());
transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
return;
} else if (error.exception() instanceof RetriableException) {

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
@ -254,8 +255,10 @@ public class TransactionManager {
return handleCachedTransactionRequestResult(() -> {
transitionTo(State.INITIALIZING);
setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
InitProducerIdHandler handler = new InitProducerIdHandler(builder);
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs);
InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData));
enqueueRequest(handler);
return handler.result;
}, State.INITIALIZING);
@ -1020,7 +1023,8 @@ public class TransactionManager {
Errors error = initProducerIdResponse.error();
if (error == Errors.NONE) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data.producerId(),
initProducerIdResponse.data.producerEpoch());
setProducerIdAndEpoch(producerIdAndEpoch);
transitionTo(State.READY);
lastError = null;

View File

@ -26,6 +26,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@ -81,8 +83,6 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.requests.LeaderAndIsrResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
@ -155,8 +155,7 @@ public enum ApiKeys {
CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS),
DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS),
DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()),
INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequest.schemaVersions(),
InitProducerIdResponse.schemaVersions()),
INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequestData.SCHEMAS, InitProducerIdResponseData.SCHEMAS),
OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", false, OffsetsForLeaderEpochRequest.schemaVersions(),
OffsetsForLeaderEpochResponse.schemaVersions()),
ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2,

View File

@ -76,11 +76,13 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
}
private final short version;
public final ApiKeys api;
public AbstractRequest(ApiKeys api, short version) {
if (!api.isVersionSupported(version))
throw new UnsupportedVersionException("The " + api + " protocol does not support version " + version);
this.version = version;
this.api = api;
}
/**

View File

@ -115,7 +115,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case DELETE_RECORDS:
return new DeleteRecordsResponse(struct);
case INIT_PRODUCER_ID:
return new InitProducerIdResponse(struct);
return new InitProducerIdResponse(struct, version);
case OFFSET_FOR_LEADER_EPOCH:
return new OffsetsForLeaderEpochResponse(struct);
case ADD_PARTITIONS_TO_TXN:

View File

@ -17,12 +17,12 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
@ -50,7 +50,6 @@ public class CreateTopicsRequest extends AbstractRequest {
}
private final CreateTopicsRequestData data;
private final short version;
public static final int NO_NUM_PARTITIONS = -1;
public static final short NO_REPLICATION_FACTOR = -1;
@ -58,13 +57,11 @@ public class CreateTopicsRequest extends AbstractRequest {
private CreateTopicsRequest(CreateTopicsRequestData data, short version) {
super(ApiKeys.CREATE_TOPICS, version);
this.data = data;
this.version = version;
}
public CreateTopicsRequest(Struct struct, short version) {
super(ApiKeys.CREATE_TOPICS, version);
this.data = new CreateTopicsRequestData(struct, version);
this.version = version;
}
public CreateTopicsRequestData data() {
@ -96,6 +93,6 @@ public class CreateTopicsRequest extends AbstractRequest {
*/
@Override
public Struct toStruct() {
return data.toStruct(version);
return data.toStruct(version());
}
}

View File

@ -86,14 +86,14 @@ public class DescribeAclsRequest extends AbstractRequest {
private final AclBindingFilter filter;
DescribeAclsRequest(AclBindingFilter filter, short version) {
super(ApiKeys.DELETE_ACLS, version);
super(ApiKeys.DESCRIBE_ACLS, version);
this.filter = filter;
validate(filter, version);
}
public DescribeAclsRequest(Struct struct, short version) {
super(ApiKeys.DELETE_ACLS, version);
super(ApiKeys.DESCRIBE_ACLS, version);
ResourcePatternFilter resourceFilter = RequestUtils.resourcePatternFilterFromStructFields(struct);
AccessControlEntryFilter entryFilter = RequestUtils.aceFilterFromStructFields(struct);
this.filter = new AclBindingFilter(resourceFilter, entryFilter);

View File

@ -16,106 +16,71 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID;
import static org.apache.kafka.common.protocol.types.Type.INT32;
public class InitProducerIdRequest extends AbstractRequest {
public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
private static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
NULLABLE_TRANSACTIONAL_ID,
new Field(TRANSACTION_TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for before aborting idle transactions sent by this producer."));
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema INIT_PRODUCER_ID_REQUEST_V1 = INIT_PRODUCER_ID_REQUEST_V0;
public static Schema[] schemaVersions() {
return new Schema[]{INIT_PRODUCER_ID_REQUEST_V0, INIT_PRODUCER_ID_REQUEST_V1};
}
private final String transactionalId;
private final int transactionTimeoutMs;
public static class Builder extends AbstractRequest.Builder<InitProducerIdRequest> {
private final String transactionalId;
private final int transactionTimeoutMs;
private final InitProducerIdRequestData data;
public Builder(String transactionalId) {
this(transactionalId, NO_TRANSACTION_TIMEOUT_MS);
}
public Builder(String transactionalId, int transactionTimeoutMs) {
public Builder(InitProducerIdRequestData data) {
super(ApiKeys.INIT_PRODUCER_ID);
if (transactionTimeoutMs <= 0)
throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs);
if (transactionalId != null && transactionalId.isEmpty())
throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
this.transactionalId = transactionalId;
this.transactionTimeoutMs = transactionTimeoutMs;
this.data = data;
}
@Override
public InitProducerIdRequest build(short version) {
return new InitProducerIdRequest(version, transactionalId, transactionTimeoutMs);
if (data.transactionTimeoutMs() <= 0)
throw new IllegalArgumentException("transaction timeout value is not positive: " + data.transactionTimeoutMs());
if (data.transactionalId() != null && data.transactionalId().isEmpty())
throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
return new InitProducerIdRequest(data, version);
}
@Override
public String toString() {
return "(type=InitProducerIdRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
transactionTimeoutMs + ")";
return data.toString();
}
}
public final InitProducerIdRequestData data;
private InitProducerIdRequest(InitProducerIdRequestData data, short version) {
super(ApiKeys.INIT_PRODUCER_ID, version);
this.data = data;
}
public InitProducerIdRequest(Struct struct, short version) {
super(ApiKeys.INIT_PRODUCER_ID, version);
this.transactionalId = struct.get(NULLABLE_TRANSACTIONAL_ID);
this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
this.data = new InitProducerIdRequestData(struct, version);
}
private InitProducerIdRequest(short version, String transactionalId, int transactionTimeoutMs) {
super(ApiKeys.INIT_PRODUCER_ID, version);
this.transactionalId = transactionalId;
this.transactionTimeoutMs = transactionTimeoutMs;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new InitProducerIdResponse(throttleTimeMs, Errors.forException(e));
InitProducerIdResponseData response = new InitProducerIdResponseData()
.setErrorCode(Errors.forException(e).code())
.setProducerId(RecordBatch.NO_PRODUCER_ID)
.setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
.setThrottleTimeMs(0);
return new InitProducerIdResponse(response);
}
public static InitProducerIdRequest parse(ByteBuffer buffer, short version) {
return new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
}
public String transactionalId() {
return transactionalId;
}
public int transactionTimeoutMs() {
return transactionTimeoutMs;
}
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
struct.set(NULLABLE_TRANSACTIONAL_ID, transactionalId);
struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
return struct;
return data.toStruct(version());
}
}

View File

@ -16,110 +16,59 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
/**
* Possible error codes:
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
* - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
*/
public class InitProducerIdResponse extends AbstractResponse {
// Possible error codes:
// NotCoordinator
// CoordinatorNotAvailable
// CoordinatorLoadInProgress
// TransactionalIdAuthorizationFailed
// ClusterAuthorizationFailed
public final InitProducerIdResponseData data;
private static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
THROTTLE_TIME_MS,
ERROR_CODE,
PRODUCER_ID,
PRODUCER_EPOCH);
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema INIT_PRODUCER_ID_RESPONSE_V1 = INIT_PRODUCER_ID_RESPONSE_V0;
public static Schema[] schemaVersions() {
return new Schema[]{INIT_PRODUCER_ID_RESPONSE_V0, INIT_PRODUCER_ID_RESPONSE_V1};
public InitProducerIdResponse(InitProducerIdResponseData data) {
this.data = data;
}
private final int throttleTimeMs;
private final Errors error;
private final long producerId;
private final short epoch;
public InitProducerIdResponse(int throttleTimeMs, Errors error, long producerId, short epoch) {
this.throttleTimeMs = throttleTimeMs;
this.error = error;
this.producerId = producerId;
this.epoch = epoch;
}
public InitProducerIdResponse(Struct struct) {
this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
this.error = Errors.forCode(struct.get(ERROR_CODE));
this.producerId = struct.get(PRODUCER_ID);
this.epoch = struct.get(PRODUCER_EPOCH);
}
public InitProducerIdResponse(int throttleTimeMs, Errors errors) {
this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
public InitProducerIdResponse(Struct struct, short version) {
this.data = new InitProducerIdResponseData(struct, version);
}
@Override
public int throttleTimeMs() {
return throttleTimeMs;
}
public long producerId() {
return producerId;
}
public Errors error() {
return error;
return data.throttleTimeMs();
}
@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(error);
}
public short epoch() {
return epoch;
return errorCounts(Errors.forCode(data.errorCode()));
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
struct.set(THROTTLE_TIME_MS, throttleTimeMs);
struct.set(PRODUCER_ID, producerId);
struct.set(PRODUCER_EPOCH, epoch);
struct.set(ERROR_CODE, error.code());
return struct;
return data.toStruct(version);
}
public static InitProducerIdResponse parse(ByteBuffer buffer, short version) {
return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer), version);
}
@Override
public String toString() {
return "InitProducerIdResponse(" +
"error=" + error +
", producerId=" + producerId +
", producerEpoch=" + epoch +
", throttleTimeMs=" + throttleTimeMs +
')';
return data.toString();
}
public Errors error() {
return Errors.forCode(data.errorCode());
}
@Override

View File

@ -23,6 +23,6 @@
{ "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The transactional id, or null if the producer is not transactional." },
{ "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
"about": "The time in ms to wait for before aborting idle transactions sent by this producer." }
"about": "The time in ms to wait for before aborting idle transactions sent by this producer. This is only relevant if a TransactionalId has been defined." }
]
}

View File

@ -25,7 +25,7 @@
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ProducerId", "type": "int64", "versions": "0+",
"about": "The current producer id." },
"default": -1, "about": "The current producer id." },
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+",
"about": "The current epoch associated with the producer id." }
]

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -108,7 +109,6 @@ import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.spy;
public class SenderTest {
private static final int MAX_REQUEST_SIZE = 1024 * 1024;
private static final short ACKS_ALL = -1;
private static final String CLIENT_ID = "clientId";
@ -2321,15 +2321,22 @@ public class SenderTest {
if (error != Errors.NONE)
producerEpoch = RecordBatch.NO_PRODUCER_EPOCH;
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
return body instanceof InitProducerIdRequest && ((InitProducerIdRequest) body).transactionalId() == null;
}
}, new InitProducerIdResponse(0, error, producerId, producerEpoch));
client.prepareResponse(body -> {
return body instanceof InitProducerIdRequest &&
((InitProducerIdRequest) body).data.transactionalId() == null;
}, initProducerIdResponse(producerId, producerEpoch, error));
sender.run(time.milliseconds());
}
private InitProducerIdResponse initProducerIdResponse(long producerId, short producerEpoch, Errors error) {
InitProducerIdResponseData responseData = new InitProducerIdResponseData()
.setErrorCode(error.code())
.setProducerEpoch(producerEpoch)
.setProducerId(producerId)
.setThrottleTimeMs(0);
return new InitProducerIdResponse(responseData);
}
private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE);
@ -2345,8 +2352,8 @@ public class SenderTest {
client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0)));
}
private void prepareInitPidResponse(Errors error, long pid, short epoch) {
client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch));
private void prepareInitPidResponse(Errors error, long producerId, short producerEpoch) {
client.prepareResponse(initProducerIdResponse(producerId, producerEpoch, error));
}
private void assertFutureFailure(Future<?> future, Class<? extends Exception> expectedExceptionType)

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@ -703,8 +704,8 @@ public class TransactionManagerTest {
client.prepareUnsupportedVersionResponse(body -> {
InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
assertEquals(initProducerIdRequest.data.transactionalId(), transactionalId);
assertEquals(initProducerIdRequest.data.transactionTimeoutMs(), transactionTimeoutMs);
return true;
});
@ -2381,21 +2382,26 @@ public class TransactionManagerTest {
}, new FindCoordinatorResponse(error, brokerNode), shouldDisconnect);
}
private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long pid, short epoch) {
private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) {
InitProducerIdResponseData responseData = new InitProducerIdResponseData()
.setErrorCode(error.code())
.setProducerEpoch(producerEpoch)
.setProducerId(producerId)
.setThrottleTimeMs(0);
client.prepareResponse(body -> {
InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
assertEquals(initProducerIdRequest.data.transactionalId(), transactionalId);
assertEquals(initProducerIdRequest.data.transactionTimeoutMs(), transactionTimeoutMs);
return true;
}, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
}, new InitProducerIdResponse(responseData), shouldDisconnect);
}
private void sendProduceResponse(Errors error, final long pid, final short epoch) {
client.respond(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
private void sendProduceResponse(Errors error, final long producerId, final short producerEpoch) {
client.respond(produceRequestMatcher(producerId, producerEpoch), produceResponse(tp0, 0, error, 0));
}
private void prepareProduceResponse(Errors error, final long pid, final short epoch) {
client.prepareResponse(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
private void prepareProduceResponse(Errors error, final long producerId, final short producerEpoch) {
client.prepareResponse(produceRequestMatcher(producerId, producerEpoch), produceResponse(tp0, 0, error, 0));
}
private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch) {
return body -> {

View File

@ -52,6 +52,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPar
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@ -389,30 +391,39 @@ public class RequestResponseTest {
checkResponse(req.getErrorResponse(e), req.version());
}
private void checkRequest(AbstractRequest req) throws Exception {
private void checkRequest(AbstractRequest req) {
// Check that we can serialize, deserialize and serialize again
// We don't check for equality or hashCode because it is likely to fail for any request containing a HashMap
checkRequest(req, false);
}
private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) throws Exception {
private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) {
// Check that we can serialize, deserialize and serialize again
// Check for equality and hashCode only if indicated
Struct struct = req.toStruct();
AbstractRequest deserialized = (AbstractRequest) deserialize(req, struct, req.version());
Struct struct2 = deserialized.toStruct();
if (checkEqualityAndHashCode) {
assertEquals(struct, struct2);
assertEquals(struct.hashCode(), struct2.hashCode());
try {
Struct struct = req.toStruct();
AbstractRequest deserialized = AbstractRequest.parseRequest(req.api, req.version(), struct);
Struct struct2 = deserialized.toStruct();
if (checkEqualityAndHashCode) {
assertEquals(struct, struct2);
assertEquals(struct.hashCode(), struct2.hashCode());
}
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize request " + req + " with type " + req.getClass(), e);
}
}
private void checkResponse(AbstractResponse response, int version) throws Exception {
// Check that we can serialize, deserialize and serialize again
// We don't check for equality or hashCode because it is likely to fail for any response containing a HashMap
Struct struct = response.toStruct((short) version);
AbstractResponse deserialized = (AbstractResponse) deserialize(response, struct, (short) version);
Struct struct2 = deserialized.toStruct((short) version);
try {
Struct struct = response.toStruct((short) version);
AbstractResponse deserialized = (AbstractResponse) deserialize(response, struct, (short) version);
Struct struct2 = deserialized.toStruct((short) version);
assertEquals(struct2, struct);
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize response " + response + " with type " + response.getClass(), e);
}
}
private AbstractRequestResponse deserialize(AbstractRequestResponse req, Struct struct, short version) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
@ -1167,14 +1178,21 @@ public class RequestResponseTest {
}
private InitProducerIdRequest createInitPidRequest() {
return new InitProducerIdRequest.Builder(null, 100).build();
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(null)
.setTransactionTimeoutMs(100);
return new InitProducerIdRequest.Builder(requestData).build();
}
private InitProducerIdResponse createInitPidResponse() {
return new InitProducerIdResponse(0, Errors.NONE, 3332, (short) 3);
InitProducerIdResponseData responseData = new InitProducerIdResponseData()
.setErrorCode(Errors.NONE.code())
.setProducerEpoch((short) 3)
.setProducerId(3332)
.setThrottleTimeMs(0);
return new InitProducerIdResponse(responseData);
}
private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = new HashMap<>();

View File

@ -45,16 +45,9 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
import org.apache.kafka.common.message.DeleteTopicsResponseData
import org.apache.kafka.common.message.{CreateTopicsResponseData, DeleteTopicsResponseData, DescribeGroupsResponseData, ElectPreferredLeadersResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData}
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultSet}
import org.apache.kafka.common.message.DescribeGroupsResponseData
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
import org.apache.kafka.common.message.JoinGroupResponseData
import org.apache.kafka.common.message.LeaveGroupResponseData
import org.apache.kafka.common.message.SaslAuthenticateResponseData
import org.apache.kafka.common.message.SaslHandshakeResponseData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -1680,7 +1673,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
val initProducerIdRequest = request.body[InitProducerIdRequest]
val transactionalId = initProducerIdRequest.transactionalId
val transactionalId = initProducerIdRequest.data.transactionalId
if (transactionalId != null) {
if (!authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL))) {
@ -1694,13 +1687,18 @@ class KafkaApis(val requestChannel: RequestChannel,
def sendResponseCallback(result: InitProducerIdResult): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch)
val responseData = new InitProducerIdResponseData()
.setProducerId(result.producerId)
.setProducerEpoch(result.producerEpoch)
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(result.error.code)
val responseBody = new InitProducerIdResponse(responseData)
trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
}
txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.data.transactionTimeoutMs, sendResponseCallback)
}
def handleEndTxnRequest(request: RequestChannel.Request): Unit = {

View File

@ -24,13 +24,11 @@ import kafka.security.auth._
import kafka.utils.TestUtils
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData}
import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, InitProducerIdRequestData, JoinGroupRequestData, LeaveGroupRequestData, SaslAuthenticateRequestData, SaslHandshakeRequestData}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.message.ControlledShutdownRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
import org.apache.kafka.common.message.SaslAuthenticateRequestData
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
@ -321,7 +319,10 @@ class RequestQuotaTest extends BaseRequestTest {
new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava)
case ApiKeys.INIT_PRODUCER_ID =>
new InitProducerIdRequest.Builder("abc")
val requestData = new InitProducerIdRequestData()
.setTransactionalId("test-transactional-id")
.setTransactionTimeoutMs(5000)
new InitProducerIdRequest.Builder(requestData)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
@ -463,7 +464,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.FETCH => FetchResponse.parse(response).throttleTimeMs
case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
case ApiKeys.METADATA =>
new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs
new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs
case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response).throttleTimeMs
case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs
case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs
@ -472,15 +473,15 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs
case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs
case ApiKeys.DESCRIBE_GROUPS =>
new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs
new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs
case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs
case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
case ApiKeys.CREATE_TOPICS =>
new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion()).throttleTimeMs
new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion).throttleTimeMs
case ApiKeys.DELETE_TOPICS =>
new DeleteTopicsResponse(response, ApiKeys.DELETE_TOPICS.latestVersion()).throttleTimeMs
new DeleteTopicsResponse(response, ApiKeys.DELETE_TOPICS.latestVersion).throttleTimeMs
case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs
case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs
case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response, ApiKeys.INIT_PRODUCER_ID.latestVersion).throttleTimeMs
case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs
case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs
case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs