diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 6189aae1736..33bc4966c52 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; @@ -480,7 +481,10 @@ public class Sender implements Runnable { private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException { String nodeId = node.idString(); - InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null); + InitProducerIdRequestData requestData = new InitProducerIdRequestData() + .setTransactionalId(null) + .setTransactionTimeoutMs(Integer.MAX_VALUE); + InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(requestData); ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, requestTimeoutMs, null); return NetworkClientUtils.sendAndReceive(client, request, time); } @@ -504,7 +508,7 @@ public class Sender implements Runnable { Errors error = initProducerIdResponse.error(); if (error == Errors.NONE) { ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch( - initProducerIdResponse.producerId(), initProducerIdResponse.epoch()); + initProducerIdResponse.data.producerId(), initProducerIdResponse.data.producerEpoch()); transactionManager.setProducerIdAndEpoch(producerIdAndEpoch); return; } else if (error.exception() instanceof RetriableException) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index b6190932661..b34cc98de31 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.RecordBatch; @@ -254,8 +255,10 @@ public class TransactionManager { return handleCachedTransactionRequestResult(() -> { transitionTo(State.INITIALIZING); setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); - InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); - InitProducerIdHandler handler = new InitProducerIdHandler(builder); + InitProducerIdRequestData requestData = new InitProducerIdRequestData() + .setTransactionalId(transactionalId) + .setTransactionTimeoutMs(transactionTimeoutMs); + InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData)); enqueueRequest(handler); return handler.result; }, State.INITIALIZING); @@ -1020,7 +1023,8 @@ public class TransactionManager { Errors error = initProducerIdResponse.error(); if (error == Errors.NONE) { - ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch()); + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data.producerId(), + initProducerIdResponse.data.producerEpoch()); setProducerIdAndEpoch(producerIdAndEpoch); transitionTo(State.READY); lastError = null; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index f49c99a4519..33d673615bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -26,6 +26,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; +import org.apache.kafka.common.message.InitProducerIdRequestData; +import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.LeaveGroupRequestData; @@ -81,8 +83,6 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; -import org.apache.kafka.common.requests.InitProducerIdRequest; -import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.LeaderAndIsrRequest; import org.apache.kafka.common.requests.LeaderAndIsrResponse; import org.apache.kafka.common.requests.ListGroupsRequest; @@ -155,8 +155,7 @@ public enum ApiKeys { CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS), DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS), DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()), - INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequest.schemaVersions(), - InitProducerIdResponse.schemaVersions()), + INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequestData.SCHEMAS, InitProducerIdResponseData.SCHEMAS), OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", false, OffsetsForLeaderEpochRequest.schemaVersions(), OffsetsForLeaderEpochResponse.schemaVersions()), ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 239024f8632..c069bc930a3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -76,11 +76,13 @@ public abstract class AbstractRequest extends AbstractRequestResponse { } private final short version; + public final ApiKeys api; public AbstractRequest(ApiKeys api, short version) { if (!api.isVersionSupported(version)) throw new UnsupportedVersionException("The " + api + " protocol does not support version " + version); this.version = version; + this.api = api; } /** diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index f594f207efe..50ae0b5fd49 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -115,7 +115,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { case DELETE_RECORDS: return new DeleteRecordsResponse(struct); case INIT_PRODUCER_ID: - return new InitProducerIdResponse(struct); + return new InitProducerIdResponse(struct, version); case OFFSET_FOR_LEADER_EPOCH: return new OffsetsForLeaderEpochResponse(struct); case ADD_PARTITIONS_TO_TXN: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java index 93f7ab269a4..a2cd17d9c7a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java @@ -17,12 +17,12 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.errors.UnsupportedVersionException; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -50,7 +50,6 @@ public class CreateTopicsRequest extends AbstractRequest { } private final CreateTopicsRequestData data; - private final short version; public static final int NO_NUM_PARTITIONS = -1; public static final short NO_REPLICATION_FACTOR = -1; @@ -58,13 +57,11 @@ public class CreateTopicsRequest extends AbstractRequest { private CreateTopicsRequest(CreateTopicsRequestData data, short version) { super(ApiKeys.CREATE_TOPICS, version); this.data = data; - this.version = version; } public CreateTopicsRequest(Struct struct, short version) { super(ApiKeys.CREATE_TOPICS, version); this.data = new CreateTopicsRequestData(struct, version); - this.version = version; } public CreateTopicsRequestData data() { @@ -96,6 +93,6 @@ public class CreateTopicsRequest extends AbstractRequest { */ @Override public Struct toStruct() { - return data.toStruct(version); + return data.toStruct(version()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java index 04bfee8d740..ed417c57730 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java @@ -86,14 +86,14 @@ public class DescribeAclsRequest extends AbstractRequest { private final AclBindingFilter filter; DescribeAclsRequest(AclBindingFilter filter, short version) { - super(ApiKeys.DELETE_ACLS, version); + super(ApiKeys.DESCRIBE_ACLS, version); this.filter = filter; validate(filter, version); } public DescribeAclsRequest(Struct struct, short version) { - super(ApiKeys.DELETE_ACLS, version); + super(ApiKeys.DESCRIBE_ACLS, version); ResourcePatternFilter resourceFilter = RequestUtils.resourcePatternFilterFromStructFields(struct); AccessControlEntryFilter entryFilter = RequestUtils.aceFilterFromStructFields(struct); this.filter = new AclBindingFilter(resourceFilter, entryFilter); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java index aab7c72cf43..8351c215983 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java @@ -16,106 +16,71 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.InitProducerIdRequestData; +import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.RecordBatch; import java.nio.ByteBuffer; -import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID; -import static org.apache.kafka.common.protocol.types.Type.INT32; - public class InitProducerIdRequest extends AbstractRequest { - public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE; - - private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms"; - - private static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema( - NULLABLE_TRANSACTIONAL_ID, - new Field(TRANSACTION_TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for before aborting idle transactions sent by this producer.")); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema INIT_PRODUCER_ID_REQUEST_V1 = INIT_PRODUCER_ID_REQUEST_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{INIT_PRODUCER_ID_REQUEST_V0, INIT_PRODUCER_ID_REQUEST_V1}; - } - - private final String transactionalId; - private final int transactionTimeoutMs; - public static class Builder extends AbstractRequest.Builder { - private final String transactionalId; - private final int transactionTimeoutMs; + private final InitProducerIdRequestData data; - public Builder(String transactionalId) { - this(transactionalId, NO_TRANSACTION_TIMEOUT_MS); - } - - public Builder(String transactionalId, int transactionTimeoutMs) { + public Builder(InitProducerIdRequestData data) { super(ApiKeys.INIT_PRODUCER_ID); - - if (transactionTimeoutMs <= 0) - throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs); - - if (transactionalId != null && transactionalId.isEmpty()) - throw new IllegalArgumentException("Must set either a null or a non-empty transactional id."); - - this.transactionalId = transactionalId; - this.transactionTimeoutMs = transactionTimeoutMs; + this.data = data; } @Override public InitProducerIdRequest build(short version) { - return new InitProducerIdRequest(version, transactionalId, transactionTimeoutMs); + if (data.transactionTimeoutMs() <= 0) + throw new IllegalArgumentException("transaction timeout value is not positive: " + data.transactionTimeoutMs()); + + if (data.transactionalId() != null && data.transactionalId().isEmpty()) + throw new IllegalArgumentException("Must set either a null or a non-empty transactional id."); + + return new InitProducerIdRequest(data, version); } @Override public String toString() { - return "(type=InitProducerIdRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" + - transactionTimeoutMs + ")"; + return data.toString(); } } + public final InitProducerIdRequestData data; + + private InitProducerIdRequest(InitProducerIdRequestData data, short version) { + super(ApiKeys.INIT_PRODUCER_ID, version); + this.data = data; + } + public InitProducerIdRequest(Struct struct, short version) { super(ApiKeys.INIT_PRODUCER_ID, version); - this.transactionalId = struct.get(NULLABLE_TRANSACTIONAL_ID); - this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME); + this.data = new InitProducerIdRequestData(struct, version); } - private InitProducerIdRequest(short version, String transactionalId, int transactionTimeoutMs) { - super(ApiKeys.INIT_PRODUCER_ID, version); - this.transactionalId = transactionalId; - this.transactionTimeoutMs = transactionTimeoutMs; - } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new InitProducerIdResponse(throttleTimeMs, Errors.forException(e)); + InitProducerIdResponseData response = new InitProducerIdResponseData() + .setErrorCode(Errors.forException(e).code()) + .setProducerId(RecordBatch.NO_PRODUCER_ID) + .setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH) + .setThrottleTimeMs(0); + return new InitProducerIdResponse(response); } public static InitProducerIdRequest parse(ByteBuffer buffer, short version) { return new InitProducerIdRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version); } - public String transactionalId() { - return transactionalId; - } - - public int transactionTimeoutMs() { - return transactionTimeoutMs; - } - @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version())); - struct.set(NULLABLE_TRANSACTIONAL_ID, transactionalId); - struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs); - return struct; + return data.toStruct(version()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java index 9a1e0f744cd..a33daf3e2d2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java @@ -16,110 +16,59 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.record.RecordBatch; import java.nio.ByteBuffer; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; - +/** + * Possible error codes: + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED} + * - {@link Errors#CLUSTER_AUTHORIZATION_FAILED} + */ public class InitProducerIdResponse extends AbstractResponse { - // Possible error codes: - // NotCoordinator - // CoordinatorNotAvailable - // CoordinatorLoadInProgress - // TransactionalIdAuthorizationFailed - // ClusterAuthorizationFailed + public final InitProducerIdResponseData data; - private static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema( - THROTTLE_TIME_MS, - ERROR_CODE, - PRODUCER_ID, - PRODUCER_EPOCH); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema INIT_PRODUCER_ID_RESPONSE_V1 = INIT_PRODUCER_ID_RESPONSE_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{INIT_PRODUCER_ID_RESPONSE_V0, INIT_PRODUCER_ID_RESPONSE_V1}; + public InitProducerIdResponse(InitProducerIdResponseData data) { + this.data = data; } - private final int throttleTimeMs; - private final Errors error; - private final long producerId; - private final short epoch; - - public InitProducerIdResponse(int throttleTimeMs, Errors error, long producerId, short epoch) { - this.throttleTimeMs = throttleTimeMs; - this.error = error; - this.producerId = producerId; - this.epoch = epoch; - } - - public InitProducerIdResponse(Struct struct) { - this.throttleTimeMs = struct.get(THROTTLE_TIME_MS); - this.error = Errors.forCode(struct.get(ERROR_CODE)); - this.producerId = struct.get(PRODUCER_ID); - this.epoch = struct.get(PRODUCER_EPOCH); - } - - public InitProducerIdResponse(int throttleTimeMs, Errors errors) { - this(throttleTimeMs, errors, RecordBatch.NO_PRODUCER_ID, (short) 0); + public InitProducerIdResponse(Struct struct, short version) { + this.data = new InitProducerIdResponseData(struct, version); } @Override public int throttleTimeMs() { - return throttleTimeMs; - } - - public long producerId() { - return producerId; - } - - public Errors error() { - return error; + return data.throttleTimeMs(); } @Override public Map errorCounts() { - return errorCounts(error); - } - - public short epoch() { - return epoch; + return errorCounts(Errors.forCode(data.errorCode())); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version)); - struct.set(THROTTLE_TIME_MS, throttleTimeMs); - struct.set(PRODUCER_ID, producerId); - struct.set(PRODUCER_EPOCH, epoch); - struct.set(ERROR_CODE, error.code()); - return struct; + return data.toStruct(version); } public static InitProducerIdResponse parse(ByteBuffer buffer, short version) { - return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer)); + return new InitProducerIdResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer), version); } @Override public String toString() { - return "InitProducerIdResponse(" + - "error=" + error + - ", producerId=" + producerId + - ", producerEpoch=" + epoch + - ", throttleTimeMs=" + throttleTimeMs + - ')'; + return data.toString(); + } + + public Errors error() { + return Errors.forCode(data.errorCode()); } @Override diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json index 8bf2ce35748..c8ca110ac09 100644 --- a/clients/src/main/resources/common/message/InitProducerIdRequest.json +++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json @@ -23,6 +23,6 @@ { "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The transactional id, or null if the producer is not transactional." }, { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+", - "about": "The time in ms to wait for before aborting idle transactions sent by this producer." } + "about": "The time in ms to wait for before aborting idle transactions sent by this producer. This is only relevant if a TransactionalId has been defined." } ] } diff --git a/clients/src/main/resources/common/message/InitProducerIdResponse.json b/clients/src/main/resources/common/message/InitProducerIdResponse.json index b251051ec90..a52fc81b1ed 100644 --- a/clients/src/main/resources/common/message/InitProducerIdResponse.json +++ b/clients/src/main/resources/common/message/InitProducerIdResponse.json @@ -25,7 +25,7 @@ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ProducerId", "type": "int64", "versions": "0+", - "about": "The current producer id." }, + "default": -1, "about": "The current producer id." }, { "name": "ProducerEpoch", "type": "int16", "versions": "0+", "about": "The current epoch associated with the producer id." } ] diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 4cbdaa25fa0..d397fd46a59 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -108,7 +109,6 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.spy; public class SenderTest { - private static final int MAX_REQUEST_SIZE = 1024 * 1024; private static final short ACKS_ALL = -1; private static final String CLIENT_ID = "clientId"; @@ -2321,15 +2321,22 @@ public class SenderTest { if (error != Errors.NONE) producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; - client.prepareResponse(new MockClient.RequestMatcher() { - @Override - public boolean matches(AbstractRequest body) { - return body instanceof InitProducerIdRequest && ((InitProducerIdRequest) body).transactionalId() == null; - } - }, new InitProducerIdResponse(0, error, producerId, producerEpoch)); + client.prepareResponse(body -> { + return body instanceof InitProducerIdRequest && + ((InitProducerIdRequest) body).data.transactionalId() == null; + }, initProducerIdResponse(producerId, producerEpoch, error)); sender.run(time.milliseconds()); } + private InitProducerIdResponse initProducerIdResponse(long producerId, short producerEpoch, Errors error) { + InitProducerIdResponseData responseData = new InitProducerIdResponseData() + .setErrorCode(error.code()) + .setProducerEpoch(producerEpoch) + .setProducerId(producerId) + .setThrottleTimeMs(0); + return new InitProducerIdResponse(responseData); + } + private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) { transactionManager.initializeTransactions(); prepareFindCoordinatorResponse(Errors.NONE); @@ -2345,8 +2352,8 @@ public class SenderTest { client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0))); } - private void prepareInitPidResponse(Errors error, long pid, short epoch) { - client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch)); + private void prepareInitPidResponse(Errors error, long producerId, short producerEpoch) { + client.prepareResponse(initProducerIdResponse(producerId, producerEpoch, error)); } private void assertFutureFailure(Future future, Class expectedExceptionType) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 97f7f5d23b6..1c47b9d344c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; @@ -703,8 +704,8 @@ public class TransactionManagerTest { client.prepareUnsupportedVersionResponse(body -> { InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body; - assertEquals(initProducerIdRequest.transactionalId(), transactionalId); - assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs); + assertEquals(initProducerIdRequest.data.transactionalId(), transactionalId); + assertEquals(initProducerIdRequest.data.transactionTimeoutMs(), transactionTimeoutMs); return true; }); @@ -2381,21 +2382,26 @@ public class TransactionManagerTest { }, new FindCoordinatorResponse(error, brokerNode), shouldDisconnect); } - private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long pid, short epoch) { + private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) { + InitProducerIdResponseData responseData = new InitProducerIdResponseData() + .setErrorCode(error.code()) + .setProducerEpoch(producerEpoch) + .setProducerId(producerId) + .setThrottleTimeMs(0); client.prepareResponse(body -> { InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body; - assertEquals(initProducerIdRequest.transactionalId(), transactionalId); - assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs); + assertEquals(initProducerIdRequest.data.transactionalId(), transactionalId); + assertEquals(initProducerIdRequest.data.transactionTimeoutMs(), transactionTimeoutMs); return true; - }, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect); + }, new InitProducerIdResponse(responseData), shouldDisconnect); } - private void sendProduceResponse(Errors error, final long pid, final short epoch) { - client.respond(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0)); + private void sendProduceResponse(Errors error, final long producerId, final short producerEpoch) { + client.respond(produceRequestMatcher(producerId, producerEpoch), produceResponse(tp0, 0, error, 0)); } - private void prepareProduceResponse(Errors error, final long pid, final short epoch) { - client.prepareResponse(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0)); + private void prepareProduceResponse(Errors error, final long producerId, final short producerEpoch) { + client.prepareResponse(produceRequestMatcher(producerId, producerEpoch), produceResponse(tp0, 0, error, 0)); } private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch) { return body -> { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index dfdc3233910..ca695c7553f 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -52,6 +52,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPar import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.InitProducerIdRequestData; +import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.LeaveGroupRequestData; @@ -389,30 +391,39 @@ public class RequestResponseTest { checkResponse(req.getErrorResponse(e), req.version()); } - private void checkRequest(AbstractRequest req) throws Exception { + private void checkRequest(AbstractRequest req) { // Check that we can serialize, deserialize and serialize again // We don't check for equality or hashCode because it is likely to fail for any request containing a HashMap checkRequest(req, false); } - private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) throws Exception { + private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) { // Check that we can serialize, deserialize and serialize again // Check for equality and hashCode only if indicated - Struct struct = req.toStruct(); - AbstractRequest deserialized = (AbstractRequest) deserialize(req, struct, req.version()); - Struct struct2 = deserialized.toStruct(); - if (checkEqualityAndHashCode) { - assertEquals(struct, struct2); - assertEquals(struct.hashCode(), struct2.hashCode()); + try { + Struct struct = req.toStruct(); + AbstractRequest deserialized = AbstractRequest.parseRequest(req.api, req.version(), struct); + Struct struct2 = deserialized.toStruct(); + if (checkEqualityAndHashCode) { + assertEquals(struct, struct2); + assertEquals(struct.hashCode(), struct2.hashCode()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize request " + req + " with type " + req.getClass(), e); } } private void checkResponse(AbstractResponse response, int version) throws Exception { // Check that we can serialize, deserialize and serialize again // We don't check for equality or hashCode because it is likely to fail for any response containing a HashMap - Struct struct = response.toStruct((short) version); - AbstractResponse deserialized = (AbstractResponse) deserialize(response, struct, (short) version); - Struct struct2 = deserialized.toStruct((short) version); + try { + Struct struct = response.toStruct((short) version); + AbstractResponse deserialized = (AbstractResponse) deserialize(response, struct, (short) version); + Struct struct2 = deserialized.toStruct((short) version); + assertEquals(struct2, struct); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize response " + response + " with type " + response.getClass(), e); + } } private AbstractRequestResponse deserialize(AbstractRequestResponse req, Struct struct, short version) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { @@ -1167,14 +1178,21 @@ public class RequestResponseTest { } private InitProducerIdRequest createInitPidRequest() { - return new InitProducerIdRequest.Builder(null, 100).build(); + InitProducerIdRequestData requestData = new InitProducerIdRequestData() + .setTransactionalId(null) + .setTransactionTimeoutMs(100); + return new InitProducerIdRequest.Builder(requestData).build(); } private InitProducerIdResponse createInitPidResponse() { - return new InitProducerIdResponse(0, Errors.NONE, 3332, (short) 3); + InitProducerIdResponseData responseData = new InitProducerIdResponseData() + .setErrorCode(Errors.NONE.code()) + .setProducerEpoch((short) 3) + .setProducerId(3332) + .setThrottleTimeMs(0); + return new InitProducerIdResponse(responseData); } - private OffsetsForLeaderEpochRequest createLeaderEpochRequest() { Map epochs = new HashMap<>(); diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7d12fe331d8..140fdd4b817 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -45,16 +45,9 @@ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic -import org.apache.kafka.common.message.CreateTopicsResponseData import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet} -import org.apache.kafka.common.message.DeleteTopicsResponseData +import org.apache.kafka.common.message.{CreateTopicsResponseData, DeleteTopicsResponseData, DescribeGroupsResponseData, ElectPreferredLeadersResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData} import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultSet} -import org.apache.kafka.common.message.DescribeGroupsResponseData -import org.apache.kafka.common.message.ElectPreferredLeadersResponseData -import org.apache.kafka.common.message.JoinGroupResponseData -import org.apache.kafka.common.message.LeaveGroupResponseData -import org.apache.kafka.common.message.SaslAuthenticateResponseData -import org.apache.kafka.common.message.SaslHandshakeResponseData import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -1680,7 +1673,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = { val initProducerIdRequest = request.body[InitProducerIdRequest] - val transactionalId = initProducerIdRequest.transactionalId + val transactionalId = initProducerIdRequest.data.transactionalId if (transactionalId != null) { if (!authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL))) { @@ -1694,13 +1687,18 @@ class KafkaApis(val requestChannel: RequestChannel, def sendResponseCallback(result: InitProducerIdResult): Unit = { def createResponse(requestThrottleMs: Int): AbstractResponse = { - val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch) + val responseData = new InitProducerIdResponseData() + .setProducerId(result.producerId) + .setProducerEpoch(result.producerEpoch) + .setThrottleTimeMs(requestThrottleMs) + .setErrorCode(result.error.code) + val responseBody = new InitProducerIdResponse(responseData) trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.") responseBody } sendResponseMaybeThrottle(request, createResponse) } - txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback) + txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.data.transactionTimeoutMs, sendResponseCallback) } def handleEndTxnRequest(request: RequestChannel.Request): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index d04f39f9c83..05f4bcd9fb2 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -24,13 +24,11 @@ import kafka.security.auth._ import kafka.utils.TestUtils import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.ConfigResource -import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData} +import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, InitProducerIdRequestData, JoinGroupRequestData, LeaveGroupRequestData, SaslAuthenticateRequestData, SaslHandshakeRequestData} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.message.ControlledShutdownRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet} -import org.apache.kafka.common.message.SaslAuthenticateRequestData -import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys @@ -321,7 +319,10 @@ class RequestQuotaTest extends BaseRequestTest { new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava) case ApiKeys.INIT_PRODUCER_ID => - new InitProducerIdRequest.Builder("abc") + val requestData = new InitProducerIdRequestData() + .setTransactionalId("test-transactional-id") + .setTransactionTimeoutMs(5000) + new InitProducerIdRequest.Builder(requestData) case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, @@ -463,7 +464,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.FETCH => FetchResponse.parse(response).throttleTimeMs case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs case ApiKeys.METADATA => - new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs + new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response).throttleTimeMs case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs @@ -472,15 +473,15 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs case ApiKeys.DESCRIBE_GROUPS => - new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs + new DescribeGroupsResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion).throttleTimeMs case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs case ApiKeys.CREATE_TOPICS => - new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion()).throttleTimeMs + new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion).throttleTimeMs case ApiKeys.DELETE_TOPICS => - new DeleteTopicsResponse(response, ApiKeys.DELETE_TOPICS.latestVersion()).throttleTimeMs + new DeleteTopicsResponse(response, ApiKeys.DELETE_TOPICS.latestVersion).throttleTimeMs case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs - case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs + case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response, ApiKeys.INIT_PRODUCER_ID.latestVersion).throttleTimeMs case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs