KAFKA-4990; Request/response classes for transactions (KIP-98)

Author: Matthias J. Sax <matthias@confluent.io>
Author: Guozhang Wang <wangguoz@gmail.com>
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2799 from mjsax/kafka-4990-add-api-stub-config-parameters-request-types
This commit is contained in:
Matthias J. Sax 2017-04-07 11:22:09 +01:00 committed by Ismael Juma
parent 2f4f3b957d
commit 865d82af2c
25 changed files with 1597 additions and 47 deletions

View File

@ -13,6 +13,10 @@
files=".*/protocol/Errors.java"/>
<suppress checks="ClassFanOutComplexity"
files=".*/common/utils/Utils.java"/>
<suppress checks="ClassFanOutComplexity"
files=".*/requests/AbstractRequest.java"/>
<suppress checks="ClassFanOutComplexity"
files=".*/requests/AbstractResponse.java"/>
<suppress checks="MethodLength"
files="KerberosLogin.java"/>

View File

@ -16,16 +16,16 @@
*/
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.MetricName;
/**
* The interface for the {@link KafkaProducer}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class InvalidPidMappingException extends ApiException {
public InvalidPidMappingException(String message) {
super(message);
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class InvalidTxnStateException extends ApiException {
public InvalidTxnStateException(String message) {
super(message);
}
}

View File

@ -48,7 +48,12 @@ public enum ApiKeys {
DELETE_TOPICS(20, "DeleteTopics"),
DELETE_RECORDS(21, "DeleteRecords"),
INIT_PRODUCER_ID(22, "InitProducerId"),
OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch");
OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch"),
ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn"),
ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn"),
END_TXN(26, "EndTxn"),
WRITE_TXN_MARKERS(27, "WriteTxnMarkers"),
TXN_OFFSET_COMMIT(28, "TxnOffsetCommit");
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;

View File

@ -16,9 +16,6 @@
*/
package org.apache.kafka.common.protocol;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
@ -36,17 +33,16 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidRequiredAcksException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
@ -55,23 +51,29 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* This class contains all the client-server errors--those errors that must be sent from the server to the client. These
* are thus part of the protocol. The names can be changed but the error code cannot.
@ -169,10 +171,18 @@ public enum Errors {
" the message was sent to an incompatible broker. See the broker logs for more details.")),
UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")),
POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy.")),
OUT_OF_ORDER_SEQUENCE_NUMBER(45, new OutOfOrderSequenceException("The broker received an out of order sequence number")),
DUPLICATE_SEQUENCE_NUMBER(46, new DuplicateSequenceNumberException("The broker received a duplicate sequence number")),
PRODUCER_FENCED(47, new ProducerFencedException("Producer attempted an operation with an old epoch"));
POLICY_VIOLATION(44,
new PolicyViolationException("Request parameters do not satisfy the configured policy.")),
OUT_OF_ORDER_SEQUENCE_NUMBER(45,
new OutOfOrderSequenceException("The broker received an out of order sequence number")),
DUPLICATE_SEQUENCE_NUMBER(46,
new DuplicateSequenceNumberException("The broker received a duplicate sequence number")),
INVALID_PRODUCER_EPOCH(47,
new ProducerFencedException("Producer attempted an operation with an old epoch")),
INVALID_TXN_STATE(48,
new InvalidTxnStateException("The producer attempted a transactional operation in an invalid state")),
INVALID_PID_MAPPING(49,
new InvalidPidMappingException("The PID mapping is invalid"));
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -647,7 +647,7 @@ public class Protocol {
// The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the
// last stable offset). It also exposes messages with magic v2 (along with older formats).
private static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V4 = new Schema(
new Field("pid", INT64, "The producer ID (PID) associated with the aborted transactions"),
new Field("producer_id", INT64, "The producer id associated with the aborted transactions"),
new Field("first_offset", INT64, "The first offset in the aborted transaction"));
public static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V5 = FETCH_RESPONSE_ABORTED_TRANSACTION_V4;
@ -1180,19 +1180,19 @@ public class Protocol {
public static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
new Field("transactional_id",
NULLABLE_STRING,
"The transactional id whose pid we want to retrieve or generate.")
"The transactional id whose producer id we want to retrieve or generate.")
);
public static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
new Field("error_code",
INT16,
"An integer error code."),
new Field("pid",
new Field("producer_id",
INT64,
"The pid for the input transactional id. If the input id was empty, then this is used only for ensuring idempotence of messages"),
new Field("epoch",
"The producer id for the input transactional id. If the input id was empty, then this is used only for ensuring idempotence of messages."),
new Field("producer_epoch",
INT16,
"The epoch for the pid. Will always be 0 if no transactional id was specified in the request.")
"The epoch for the producer id. Will always be 0 if no transactional id was specified in the request.")
);
public static final Schema[] INIT_PRODUCER_ID_REQUEST = new Schema[] {INIT_PRODUCER_ID_REQUEST_V0};
@ -1249,6 +1249,169 @@ public class Protocol {
public static final Schema[] OFFSET_FOR_LEADER_EPOCH_REQUEST = new Schema[] {OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
public static final Schema[] OFFSET_FOR_LEADER_EPOCH_RESPONSE = new Schema[] {OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
public static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
new Field("transactional_id",
STRING,
"The transactional id corresponding to the transaction."),
new Field("producer_id",
INT64,
"Current producer id in use by the transactional id."),
new Field("producer_epoch",
INT16,
"Current epoch associated with the producer id."),
new Field("topics",
new ArrayOf(new Schema(
new Field("topic", STRING),
new Field("partitions", new ArrayOf(INT32)))),
"The partitions to add to the transaction.")
);
public static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema(
new Field("error_code",
INT16,
"An integer error code.")
);
public static final Schema[] ADD_PARTITIONS_TO_TXN_REQUEST = new Schema[] {ADD_PARTITIONS_TO_TXN_REQUEST_V0};
public static final Schema[] ADD_PARTITIONS_TO_TXN_RESPONSE = new Schema[] {ADD_PARTITIONS_TO_TXN_RESPONSE_V0};
public static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema(
new Field("transactional_id",
STRING,
"The transactional id corresponding to the transaction."),
new Field("producer_id",
INT64,
"Current producer id in use by the transactional id."),
new Field("producer_epoch",
INT16,
"Current epoch associated with the producer id."),
new Field("consumer_group_id",
STRING,
"Consumer group id whose offsets should be included in the transaction.")
);
public static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema(
new Field("error_code",
INT16,
"An integer error code.")
);
public static final Schema[] ADD_OFFSETS_TO_TXN_REQUEST = new Schema[] {ADD_OFFSETS_TO_TXN_REQUEST_V0};
public static final Schema[] ADD_OFFSETS_TO_TXN_RESPONSE = new Schema[] {ADD_OFFSETS_TO_TXN_RESPONSE_V0};
public static final Schema END_TXN_REQUEST_V0 = new Schema(
new Field("transactional_id",
STRING,
"The transactional id corresponding to the transaction."),
new Field("producer_id",
INT64,
"Current producer id in use by the transactional id."),
new Field("producer_epoch",
INT16,
"Current epoch associated with the producer id."),
new Field("transaction_result",
BOOLEAN,
"The result of the transaction (0 = ABORT, 1 = COMMIT)")
);
public static final Schema END_TXN_RESPONSE_V0 = new Schema(
new Field("error_code",
INT16,
"An integer error code.")
);
public static final Schema[] END_TXN_REQUEST = new Schema[] {END_TXN_REQUEST_V0};
public static final Schema[] END_TXN_RESPONSE = new Schema[] {END_TXN_RESPONSE_V0};
public static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema(
new Field("producer_id",
INT64,
"Current producer id in use by the transactional id."),
new Field("producer_epoch",
INT16,
"Current epoch associated with the producer id."),
new Field("transaction_result",
BOOLEAN,
"The result of the transaction to write to the partitions (false = ABORT, true = COMMIT)."),
new Field("topics",
new ArrayOf(new Schema(
new Field("topic", STRING),
new Field("partitions", new ArrayOf(INT32)))),
"The partitions to write markers for.")
);
public static final Schema WRITE_TXN_MARKERS_REQUEST_V0 = new Schema(
new Field("coordinator_epoch",
INT32,
"Epoch associated with the transaction state partition hosted by this transaction coordinator."),
new Field("transaction_markers",
new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0),
"The transaction markers to be written.")
);
public static final Schema WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0 = new Schema(
new Field("partition", INT32),
new Field("error_code", INT16)
);
public static final Schema WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0 = new Schema(
new Field("producer_id",
INT64,
"Current producer id in use by the transactional id."),
new Field("topics",
new ArrayOf(new Schema(
new Field("topic", STRING),
new Field("partitions", new ArrayOf(WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0)))),
"Errors per partition from writing markers.")
);
public static final Schema WRITE_TXN_MARKERS_RESPONSE_V0 = new Schema(
new Field("transaction_markers", new ArrayOf(WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0), "Errors per partition from writing markers.")
);
public static final Schema[] WRITE_TXN_REQUEST = new Schema[] {WRITE_TXN_MARKERS_REQUEST_V0};
public static final Schema[] WRITE_TXN_RESPONSE = new Schema[] {WRITE_TXN_MARKERS_RESPONSE_V0};
public static final Schema TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0 = new Schema(
new Field("partition", INT32),
new Field("offset", INT64),
new Field("metadata", NULLABLE_STRING)
);
public static final Schema TXN_OFFSET_COMMIT_REQUEST_V0 = new Schema(
new Field("consumer_group_id",
STRING,
"Id of the associated consumer group to commit offsets for."),
new Field("producer_id",
INT64,
"Current producer id in use by the transactional id."),
new Field("producer_epoch",
INT16,
"Current epoch associated with the producer id."),
new Field("retention_time",
INT64,
"The time in ms to retain the offset."),
new Field("topics",
new ArrayOf(new Schema(
new Field("topic", STRING),
new Field("partitions", new ArrayOf(TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0)))),
"The partitions to write markers for.")
);
public static final Schema TXN_OFFSET_COMMIT_PARTITION_ERROR_RESPONSE_V0 = new Schema(
new Field("partition", INT32),
new Field("error_code", INT16)
);
public static final Schema TXN_OFFSET_COMMIT_RESPONSE_V0 = new Schema(
new Field("topics",
new ArrayOf(new Schema(
new Field("topic", STRING),
new Field("partitions", new ArrayOf(TXN_OFFSET_COMMIT_PARTITION_ERROR_RESPONSE_V0)))),
"Errors per partition from writing markers.")
);
public static final Schema[] TXN_OFFSET_COMMIT_REQUEST = new Schema[] {TXN_OFFSET_COMMIT_REQUEST_V0};
public static final Schema[] TXN_OFFSET_COMMIT_RESPONSE = new Schema[] {TXN_OFFSET_COMMIT_RESPONSE_V0};
/* an array of all requests and responses with all schema versions; a null value in the inner array means that the
* particular version is not supported */
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@ -1283,6 +1446,11 @@ public class Protocol {
REQUESTS[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_REQUEST;
REQUESTS[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_REQUEST;
REQUESTS[ApiKeys.OFFSET_FOR_LEADER_EPOCH.id] = OFFSET_FOR_LEADER_EPOCH_REQUEST;
REQUESTS[ApiKeys.ADD_PARTITIONS_TO_TXN.id] = ADD_PARTITIONS_TO_TXN_REQUEST;
REQUESTS[ApiKeys.ADD_OFFSETS_TO_TXN.id] = ADD_OFFSETS_TO_TXN_REQUEST;
REQUESTS[ApiKeys.END_TXN.id] = END_TXN_REQUEST;
REQUESTS[ApiKeys.WRITE_TXN_MARKERS.id] = WRITE_TXN_REQUEST;
REQUESTS[ApiKeys.TXN_OFFSET_COMMIT.id] = TXN_OFFSET_COMMIT_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@ -1308,6 +1476,11 @@ public class Protocol {
RESPONSES[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_RESPONSE;
RESPONSES[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_RESPONSE;
RESPONSES[ApiKeys.OFFSET_FOR_LEADER_EPOCH.id] = OFFSET_FOR_LEADER_EPOCH_RESPONSE;
RESPONSES[ApiKeys.ADD_PARTITIONS_TO_TXN.id] = ADD_PARTITIONS_TO_TXN_RESPONSE;
RESPONSES[ApiKeys.ADD_OFFSETS_TO_TXN.id] = ADD_OFFSETS_TO_TXN_RESPONSE;
RESPONSES[ApiKeys.END_TXN.id] = END_TXN_RESPONSE;
RESPONSES[ApiKeys.WRITE_TXN_MARKERS.id] = WRITE_TXN_RESPONSE;
RESPONSES[ApiKeys.TXN_OFFSET_COMMIT.id] = TXN_OFFSET_COMMIT_RESPONSE;
/* set the minimum and maximum version of each api */
for (ApiKeys api : ApiKeys.values()) {

View File

@ -40,8 +40,8 @@ import java.nio.ByteBuffer;
* The schema for the value field is left to the control record type to specify.
*/
public enum ControlRecordType {
COMMIT((short) 0),
ABORT((short) 1),
ABORT((short) 0),
COMMIT((short) 1),
// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
UNKNOWN((short) -1);
@ -77,9 +77,9 @@ public enum ControlRecordType {
short type = key.getShort(2);
switch (type) {
case 0:
return COMMIT;
case 1:
return ABORT;
case 1:
return COMMIT;
default:
return UNKNOWN;
}

View File

@ -177,6 +177,21 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
case OFFSET_FOR_LEADER_EPOCH:
request = new OffsetsForLeaderEpochRequest(struct, version);
break;
case ADD_PARTITIONS_TO_TXN:
request = new AddPartitionsToTxnRequest(struct, version);
break;
case ADD_OFFSETS_TO_TXN:
request = new AddOffsetsToTxnRequest(struct, version);
break;
case END_TXN:
request = new EndTxnRequest(struct, version);
break;
case WRITE_TXN_MARKERS:
request = new WriteTxnMarkersRequest(struct, version);
break;
case TXN_OFFSET_COMMIT:
request = new TxnOffsetCommitRequest(struct, version);
break;
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -97,6 +97,16 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new InitPidResponse(struct);
case OFFSET_FOR_LEADER_EPOCH:
return new OffsetsForLeaderEpochResponse(struct);
case ADD_PARTITIONS_TO_TXN:
return new AddPartitionsToTxnResponse(struct);
case ADD_OFFSETS_TO_TXN:
return new AddOffsetsToTxnResponse(struct);
case END_TXN:
return new EndTxnResponse(struct);
case WRITE_TXN_MARKERS:
return new WriteTxnMarkersResponse(struct);
case TXN_OFFSET_COMMIT:
return new TxnOffsetCommitResponse(struct);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class AddOffsetsToTxnRequest extends AbstractRequest {
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String PID_KEY_NAME = "producer_id";
private static final String EPOCH_KEY_NAME = "producer_epoch";
private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> {
private final String transactionalId;
private final long producerId;
private final short producerEpoch;
private final String consumerGroupId;
public Builder(String transactionalId, long producerId, short producerEpoch, String consumerGroupId) {
super(ApiKeys.ADD_OFFSETS_TO_TXN);
this.transactionalId = transactionalId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.consumerGroupId = consumerGroupId;
}
@Override
public AddOffsetsToTxnRequest build(short version) {
return new AddOffsetsToTxnRequest(version, transactionalId, producerId, producerEpoch, consumerGroupId);
}
}
private final String transactionalId;
private final long producerId;
private final short producerEpoch;
private final String consumerGroupId;
private AddOffsetsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch, String consumerGroupId) {
super(version);
this.transactionalId = transactionalId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.consumerGroupId = consumerGroupId;
}
public AddOffsetsToTxnRequest(Struct struct, short version) {
super(version);
this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
this.producerId = struct.getLong(PID_KEY_NAME);
this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
}
public String transactionalId() {
return transactionalId;
}
public long producerId() {
return producerId;
}
public short producerEpoch() {
return producerEpoch;
}
public String consumerGroupId() {
return consumerGroupId;
}
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.requestSchema(version()));
struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
struct.set(PID_KEY_NAME, producerId);
struct.set(EPOCH_KEY_NAME, producerEpoch);
struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
return struct;
}
@Override
public AddOffsetsToTxnResponse getErrorResponse(Throwable e) {
return new AddOffsetsToTxnResponse(Errors.forException(e));
}
public static AddOffsetsToTxnRequest parse(ByteBuffer buffer, short version) {
return new AddOffsetsToTxnRequest(ApiKeys.ADD_OFFSETS_TO_TXN.parseRequest(version, buffer), version);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class AddOffsetsToTxnResponse extends AbstractResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
// Possible error codes:
// NotCoordinator
// CoordinatorNotAvailable
// CoordinatorLoadInProgress
// InvalidPidMapping
// InvalidTxnState
// GroupAuthorizationFailed
private final Errors error;
public AddOffsetsToTxnResponse(Errors error) {
this.error = error;
}
public AddOffsetsToTxnResponse(Struct struct) {
this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
}
public Errors error() {
return error;
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version));
struct.set(ERROR_CODE_KEY_NAME, error.code());
return struct;
}
public static AddOffsetsToTxnResponse parse(ByteBuffer buffer, short version) {
return new AddOffsetsToTxnResponse(ApiKeys.ADD_PARTITIONS_TO_TXN.parseResponse(version, buffer));
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class AddPartitionsToTxnRequest extends AbstractRequest {
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String PID_KEY_NAME = "producer_id";
private static final String EPOCH_KEY_NAME = "producer_epoch";
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITIONS_KEY_NAME = "partitions";
public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
private final String transactionalId;
private final long producerId;
private final short producerEpoch;
private final List<TopicPartition> partitions;
public Builder(String transactionalId, long producerId, short producerEpoch, List<TopicPartition> partitions) {
super(ApiKeys.ADD_PARTITIONS_TO_TXN);
this.transactionalId = transactionalId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.partitions = partitions;
}
@Override
public AddPartitionsToTxnRequest build(short version) {
return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions);
}
}
private final String transactionalId;
private final long producerId;
private final short producerEpoch;
private final List<TopicPartition> partitions;
private AddPartitionsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch,
List<TopicPartition> partitions) {
super(version);
this.transactionalId = transactionalId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.partitions = partitions;
}
public AddPartitionsToTxnRequest(Struct struct, short version) {
super(version);
this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
this.producerId = struct.getLong(PID_KEY_NAME);
this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
List<TopicPartition> partitions = new ArrayList<>();
Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
for (Object topicPartitionObj : topicPartitionsArray) {
Struct topicPartitionStruct = (Struct) topicPartitionObj;
String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
partitions.add(new TopicPartition(topic, (Integer) partitionObj));
}
}
this.partitions = partitions;
}
public String transactionalId() {
return transactionalId;
}
public long producerId() {
return producerId;
}
public short producerEpoch() {
return producerEpoch;
}
public List<TopicPartition> partitions() {
return partitions;
}
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.requestSchema(version()));
struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
struct.set(PID_KEY_NAME, producerId);
struct.set(EPOCH_KEY_NAME, producerEpoch);
Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(partitions);
Object[] partitionsArray = new Object[mappedPartitions.size()];
int i = 0;
for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
partitionsArray[i++] = topicPartitionsStruct;
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
return struct;
}
@Override
public AddPartitionsToTxnResponse getErrorResponse(Throwable e) {
return new AddPartitionsToTxnResponse(Errors.forException(e));
}
public static AddPartitionsToTxnRequest parse(ByteBuffer buffer, short version) {
return new AddPartitionsToTxnRequest(ApiKeys.ADD_PARTITIONS_TO_TXN.parseRequest(version, buffer), version);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class AddPartitionsToTxnResponse extends AbstractResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
// Possible error codes:
// NotCoordinator
// CoordinatorNotAvailable
// CoordinatorLoadInProgress
// InvalidTxnState
// InvalidPidMapping
// TopicAuthorizationFailed
private final Errors error;
public AddPartitionsToTxnResponse(Errors error) {
this.error = error;
}
public AddPartitionsToTxnResponse(Struct struct) {
this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
}
public Errors error() {
return error;
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version));
struct.set(ERROR_CODE_KEY_NAME, error.code());
return struct;
}
public static AddPartitionsToTxnResponse parse(ByteBuffer buffer, short version) {
return new AddPartitionsToTxnResponse(ApiKeys.ADD_PARTITIONS_TO_TXN.parseResponse(version, buffer));
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class EndTxnRequest extends AbstractRequest {
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String PID_KEY_NAME = "producer_id";
private static final String EPOCH_KEY_NAME = "producer_epoch";
private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
public static class Builder extends AbstractRequest.Builder<EndTxnRequest> {
private final String transactionalId;
private final long producerId;
private final short producerEpoch;
private final TransactionResult result;
public Builder(String transactionalId, long producerId, short producerEpoch, TransactionResult result) {
super(ApiKeys.END_TXN);
this.transactionalId = transactionalId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.result = result;
}
@Override
public EndTxnRequest build(short version) {
return new EndTxnRequest(version, transactionalId, producerId, producerEpoch, result);
}
}
private final String transactionalId;
private final long producerId;
private final short producerEpoch;
private final TransactionResult result;
private EndTxnRequest(short version, String transactionalId, long producerId, short producerEpoch, TransactionResult result) {
super(version);
this.transactionalId = transactionalId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.result = result;
}
public EndTxnRequest(Struct struct, short version) {
super(version);
this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
this.producerId = struct.getLong(PID_KEY_NAME);
this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
this.result = TransactionResult.forId(struct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
}
public String transactionalId() {
return transactionalId;
}
public long producerId() {
return producerId;
}
public short producerEpoch() {
return producerEpoch;
}
public TransactionResult command() {
return result;
}
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.END_TXN.requestSchema(version()));
struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
struct.set(PID_KEY_NAME, producerId);
struct.set(EPOCH_KEY_NAME, producerEpoch);
struct.set(TRANSACTION_RESULT_KEY_NAME, result.id);
return struct;
}
@Override
public EndTxnResponse getErrorResponse(Throwable e) {
return new EndTxnResponse(Errors.forException(e));
}
public static EndTxnRequest parse(ByteBuffer buffer, short version) {
return new EndTxnRequest(ApiKeys.END_TXN.parseRequest(version, buffer), version);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class EndTxnResponse extends AbstractResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
// Possible error codes:
// NotCoordinator
// CoordinatorNotAvailable
// CoordinatorLoadInProgress
// InvalidTxnState
// InvalidPidMapping
private final Errors error;
public EndTxnResponse(Errors error) {
this.error = error;
}
public EndTxnResponse(Struct struct) {
this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
}
public Errors error() {
return error;
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.END_TXN.responseSchema(version));
struct.set(ERROR_CODE_KEY_NAME, error.code());
return struct;
}
public static EndTxnResponse parse(ByteBuffer buffer, short version) {
return new EndTxnResponse(ApiKeys.END_TXN.parseResponse(version, buffer));
}
}

View File

@ -56,7 +56,7 @@ public class FetchResponse extends AbstractResponse {
private static final String RECORD_SET_KEY_NAME = "record_set";
// aborted transaction field names
private static final String PID_KEY_NAME = "pid";
private static final String PID_KEY_NAME = "producer_id";
private static final String FIRST_OFFSET_KEY_NAME = "first_offset";
private static final int DEFAULT_THROTTLE_TIME = 0;
@ -78,11 +78,11 @@ public class FetchResponse extends AbstractResponse {
private final int throttleTimeMs;
public static final class AbortedTransaction {
public final long pid;
public final long producerId;
public final long firstOffset;
public AbortedTransaction(long pid, long firstOffset) {
this.pid = pid;
public AbortedTransaction(long producerId, long firstOffset) {
this.producerId = producerId;
this.firstOffset = firstOffset;
}
@ -95,19 +95,19 @@ public class FetchResponse extends AbstractResponse {
AbortedTransaction that = (AbortedTransaction) o;
return pid == that.pid && firstOffset == that.firstOffset;
return producerId == that.producerId && firstOffset == that.firstOffset;
}
@Override
public int hashCode() {
int result = (int) (pid ^ (pid >>> 32));
int result = (int) (producerId ^ (producerId >>> 32));
result = 31 * result + (int) (firstOffset ^ (firstOffset >>> 32));
return result;
}
@Override
public String toString() {
return "(pid=" + pid + ", firstOffset=" + firstOffset + ")";
return "(producerId=" + producerId + ", firstOffset=" + firstOffset + ")";
}
}
@ -211,9 +211,9 @@ public class FetchResponse extends AbstractResponse {
abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
for (Object abortedTransactionObj : abortedTransactionsArray) {
Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
long pid = abortedTransactionStruct.getLong(PID_KEY_NAME);
long producerId = abortedTransactionStruct.getLong(PID_KEY_NAME);
long firstOffset = abortedTransactionStruct.getLong(FIRST_OFFSET_KEY_NAME);
abortedTransactions.add(new AbortedTransaction(pid, firstOffset));
abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
}
}
}
@ -339,7 +339,7 @@ public class FetchResponse extends AbstractResponse {
List<Struct> abortedTransactionStructs = new ArrayList<>(fetchPartitionData.abortedTransactions.size());
for (AbortedTransaction abortedTransaction : fetchPartitionData.abortedTransactions) {
Struct abortedTransactionStruct = partitionDataHeader.instance(ABORTED_TRANSACTIONS_KEY_NAME);
abortedTransactionStruct.set(PID_KEY_NAME, abortedTransaction.pid);
abortedTransactionStruct.set(PID_KEY_NAME, abortedTransaction.producerId);
abortedTransactionStruct.set(FIRST_OFFSET_KEY_NAME, abortedTransaction.firstOffset);
abortedTransactionStructs.add(abortedTransactionStruct);
}

View File

@ -29,8 +29,8 @@ public class InitPidResponse extends AbstractResponse {
* OK
*
*/
private static final String PRODUCER_ID_KEY_NAME = "pid";
private static final String EPOCH_KEY_NAME = "epoch";
private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String EPOCH_KEY_NAME = "producer_epoch";
private static final String ERROR_CODE_KEY_NAME = "error_code";
private final Errors error;
private final long producerId;

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
public enum TransactionResult {
ABORT(false), COMMIT(true);
public final boolean id;
TransactionResult(boolean id) {
this.id = id;
}
public static TransactionResult forId(boolean id) {
if (id) {
return TransactionResult.COMMIT;
}
return TransactionResult.ABORT;
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class TxnOffsetCommitRequest extends AbstractRequest {
private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
private static final String PID_KEY_NAME = "producer_id";
private static final String EPOCH_KEY_NAME = "producer_epoch";
private static final String RETENTION_TIME_KEY_NAME = "retention_time";
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITIONS_KEY_NAME = "partitions";
private static final String PARTITION_KEY_NAME = "partition";
private static final String OFFSET_KEY_NAME = "offset";
private static final String METADATA_KEY_NAME = "metadata";
public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitRequest> {
private final String consumerGroupId;
private final long producerId;
private final short producerEpoch;
private final long retentionTimeMs;
private final Map<TopicPartition, CommittedOffset> offsets;
public Builder(String consumerGroupId, long producerId, short producerEpoch, long retentionTimeMs,
Map<TopicPartition, CommittedOffset> offsets) {
super(ApiKeys.TXN_OFFSET_COMMIT);
this.consumerGroupId = consumerGroupId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.retentionTimeMs = retentionTimeMs;
this.offsets = offsets;
}
@Override
public TxnOffsetCommitRequest build(short version) {
return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, retentionTimeMs, offsets);
}
}
private final String consumerGroupId;
private final long producerId;
private final short producerEpoch;
private final long retentionTimeMs;
private final Map<TopicPartition, CommittedOffset> offsets;
public TxnOffsetCommitRequest(short version, String consumerGroupId, long producerId, short producerEpoch,
long retentionTimeMs, Map<TopicPartition, CommittedOffset> offsets) {
super(version);
this.consumerGroupId = consumerGroupId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.retentionTimeMs = retentionTimeMs;
this.offsets = offsets;
}
public TxnOffsetCommitRequest(Struct struct, short version) {
super(version);
this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
this.producerId = struct.getLong(PID_KEY_NAME);
this.producerEpoch = struct.getShort(EPOCH_KEY_NAME);
this.retentionTimeMs = struct.getLong(RETENTION_TIME_KEY_NAME);
Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
for (Object topicPartitionObj : topicPartitionsArray) {
Struct topicPartitionStruct = (Struct) topicPartitionObj;
String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
Struct partitionStruct = (Struct) partitionObj;
TopicPartition partition = new TopicPartition(topic, partitionStruct.getInt(PARTITION_KEY_NAME));
long offset = partitionStruct.getLong(OFFSET_KEY_NAME);
String metadata = partitionStruct.getString(METADATA_KEY_NAME);
offsets.put(partition, new CommittedOffset(offset, metadata));
}
}
this.offsets = offsets;
}
public String consumerGroupId() {
return consumerGroupId;
}
public long producerId() {
return producerId;
}
public short producerEpoch() {
return producerEpoch;
}
public long retentionTimeMs() {
return retentionTimeMs;
}
public Map<TopicPartition, CommittedOffset> offsets() {
return offsets;
}
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version()));
struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
struct.set(PID_KEY_NAME, producerId);
struct.set(EPOCH_KEY_NAME, producerEpoch);
struct.set(RETENTION_TIME_KEY_NAME, retentionTimeMs);
Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = CollectionUtils.groupDataByTopic(offsets);
Object[] partitionsArray = new Object[mappedPartitionOffsets.size()];
int i = 0;
for (Map.Entry<String, Map<Integer, CommittedOffset>> topicAndPartitions : mappedPartitionOffsets.entrySet()) {
Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
Map<Integer, CommittedOffset> partitionOffsets = topicAndPartitions.getValue();
Object[] partitionOffsetsArray = new Object[partitionOffsets.size()];
int j = 0;
for (Map.Entry<Integer, CommittedOffset> partitionOffset : partitionOffsets.entrySet()) {
Struct partitionOffsetStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
partitionOffsetStruct.set(PARTITION_KEY_NAME, partitionOffset.getKey());
CommittedOffset committedOffset = partitionOffset.getValue();
partitionOffsetStruct.set(OFFSET_KEY_NAME, committedOffset.offset);
partitionOffsetStruct.set(METADATA_KEY_NAME, committedOffset.metadata);
partitionOffsetsArray[j++] = partitionOffsetStruct;
}
topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionOffsetsArray);
partitionsArray[i++] = topicPartitionsStruct;
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
return struct;
}
@Override
public TxnOffsetCommitResponse getErrorResponse(Throwable e) {
Errors error = Errors.forException(e);
Map<TopicPartition, Errors> errors = new HashMap<>(offsets.size());
for (TopicPartition partition : offsets.keySet())
errors.put(partition, error);
return new TxnOffsetCommitResponse(errors);
}
public static TxnOffsetCommitRequest parse(ByteBuffer buffer, short version) {
return new TxnOffsetCommitRequest(ApiKeys.TXN_OFFSET_COMMIT.parseRequest(version, buffer), version);
}
public static class CommittedOffset {
private final long offset;
private final String metadata;
public CommittedOffset(long offset, String metadata) {
this.offset = offset;
this.metadata = metadata;
}
@Override
public String toString() {
return "CommittedOffset(" +
"offset=" + offset +
", metadata='" + metadata + "')";
}
public long offset() {
return offset;
}
public String metadata() {
return metadata;
}
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class TxnOffsetCommitResponse extends AbstractResponse {
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String PARTITIONS_KEY_NAME = "partitions";
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
// Possible error codes:
// InvalidProducerEpoch
// NotCoordinator
// CoordinatorNotAvailable
// CoordinatorLoadInProgress
// OffsetMetadataTooLarge
// GroupAuthorizationFailed
// InvalidCommitOffsetSize
private final Map<TopicPartition, Errors> errors;
public TxnOffsetCommitResponse(Map<TopicPartition, Errors> errors) {
this.errors = errors;
}
public TxnOffsetCommitResponse(Struct struct) {
Map<TopicPartition, Errors> errors = new HashMap<>();
Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
for (Object topicPartitionObj : topicPartitionsArray) {
Struct topicPartitionStruct = (Struct) topicPartitionObj;
String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
Struct partitionStruct = (Struct) partitionObj;
Integer partition = partitionStruct.getInt(PARTITION_KEY_NAME);
Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
errors.put(new TopicPartition(topic, partition), error);
}
}
this.errors = errors;
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.responseSchema(version));
Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(errors);
Object[] partitionsArray = new Object[mappedPartitions.size()];
int i = 0;
for (Map.Entry<String, Map<Integer, Errors>> topicAndPartitions : mappedPartitions.entrySet()) {
Struct topicPartitionsStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
Map<Integer, Errors> partitionAndErrors = topicAndPartitions.getValue();
Object[] partitionAndErrorsArray = new Object[partitionAndErrors.size()];
int j = 0;
for (Map.Entry<Integer, Errors> partitionAndError : partitionAndErrors.entrySet()) {
Struct partitionAndErrorStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
partitionAndErrorStruct.set(PARTITION_KEY_NAME, partitionAndError.getKey());
partitionAndErrorStruct.set(ERROR_CODE_KEY_NAME, partitionAndError.getValue().code());
partitionAndErrorsArray[j++] = partitionAndErrorStruct;
}
topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionAndErrorsArray);
partitionsArray[i++] = topicPartitionsStruct;
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
return struct;
}
public Map<TopicPartition, Errors> errors() {
return errors;
}
public static TxnOffsetCommitResponse parse(ByteBuffer buffer, short version) {
return new TxnOffsetCommitResponse(ApiKeys.TXN_OFFSET_COMMIT.parseResponse(version, buffer));
}
}

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class WriteTxnMarkersRequest extends AbstractRequest {
private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
private static final String PID_KEY_NAME = "producer_id";
private static final String EPOCH_KEY_NAME = "producer_epoch";
private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result";
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITIONS_KEY_NAME = "partitions";
public static class TxnMarkerEntry {
private final long producerId;
private final short producerEpoch;
private final TransactionResult result;
private final List<TopicPartition> partitions;
public TxnMarkerEntry(long producerId, short producerEpoch, TransactionResult result, List<TopicPartition> partitions) {
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.result = result;
this.partitions = partitions;
}
public long producerId() {
return producerId;
}
public short producerEpoch() {
return producerEpoch;
}
public TransactionResult transactionResult() {
return result;
}
public List<TopicPartition> partitions() {
return partitions;
}
}
public static class Builder extends AbstractRequest.Builder<WriteTxnMarkersRequest> {
private final int coordinatorEpoch;
private final List<TxnMarkerEntry> markers;
public Builder(int coordinatorEpoch, List<TxnMarkerEntry> markers) {
super(ApiKeys.WRITE_TXN_MARKERS);
this.markers = markers;
this.coordinatorEpoch = coordinatorEpoch;
}
@Override
public WriteTxnMarkersRequest build(short version) {
return new WriteTxnMarkersRequest(version, coordinatorEpoch, markers);
}
}
private final int coordinatorEpoch;
private final List<TxnMarkerEntry> markers;
private WriteTxnMarkersRequest(short version, int coordinatorEpoch, List<TxnMarkerEntry> markers) {
super(version);
this.markers = markers;
this.coordinatorEpoch = coordinatorEpoch;
}
public WriteTxnMarkersRequest(Struct struct, short version) {
super(version);
this.coordinatorEpoch = struct.getInt(COORDINATOR_EPOCH_KEY_NAME);
List<TxnMarkerEntry> markers = new ArrayList<>();
Object[] markersArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME);
for (Object markerObj : markersArray) {
Struct markerStruct = (Struct) markerObj;
long producerId = markerStruct.getLong(PID_KEY_NAME);
short producerEpoch = markerStruct.getShort(EPOCH_KEY_NAME);
TransactionResult result = TransactionResult.forId(markerStruct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
List<TopicPartition> partitions = new ArrayList<>();
Object[] topicPartitionsArray = markerStruct.getArray(TOPIC_PARTITIONS_KEY_NAME);
for (Object topicPartitionObj : topicPartitionsArray) {
Struct topicPartitionStruct = (Struct) topicPartitionObj;
String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
partitions.add(new TopicPartition(topic, (Integer) partitionObj));
}
}
markers.add(new TxnMarkerEntry(producerId, producerEpoch, result, partitions));
}
this.markers = markers;
}
public int coordinatorEpoch() {
return coordinatorEpoch;
}
public List<TxnMarkerEntry> markers() {
return markers;
}
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.WRITE_TXN_MARKERS.requestSchema(version()));
struct.set(COORDINATOR_EPOCH_KEY_NAME, coordinatorEpoch);
Object[] markersArray = new Object[markers.size()];
int i = 0;
for (TxnMarkerEntry entry : markers) {
Struct markerStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
markerStruct.set(PID_KEY_NAME, entry.producerId);
markerStruct.set(EPOCH_KEY_NAME, entry.producerEpoch);
markerStruct.set(TRANSACTION_RESULT_KEY_NAME, entry.result.id);
Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(entry.partitions);
Object[] partitionsArray = new Object[mappedPartitions.size()];
int j = 0;
for (Map.Entry<String, List<Integer>> topicAndPartitions : mappedPartitions.entrySet()) {
Struct topicPartitionsStruct = markerStruct.instance(TOPIC_PARTITIONS_KEY_NAME);
topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
topicPartitionsStruct.set(PARTITIONS_KEY_NAME, topicAndPartitions.getValue().toArray());
partitionsArray[j++] = topicPartitionsStruct;
}
markerStruct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
markersArray[i++] = markerStruct;
}
struct.set(TXN_MARKER_ENTRY_KEY_NAME, markersArray);
return struct;
}
@Override
public WriteTxnMarkersResponse getErrorResponse(Throwable e) {
Errors error = Errors.forException(e);
Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>(markers.size());
for (TxnMarkerEntry entry : markers) {
Map<TopicPartition, Errors> errorsPerPartition = new HashMap<>(entry.partitions.size());
for (TopicPartition partition : entry.partitions)
errorsPerPartition.put(partition, error);
errors.put(entry.producerId, errorsPerPartition);
}
return new WriteTxnMarkersResponse(errors);
}
public static WriteTxnMarkersRequest parse(ByteBuffer buffer, short version) {
return new WriteTxnMarkersRequest(ApiKeys.WRITE_TXN_MARKERS.parseRequest(version, buffer), version);
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class WriteTxnMarkersResponse extends AbstractResponse {
private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
private static final String PID_KEY_NAME = "producer_id";
private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
private static final String PARTITIONS_KEY_NAME = "partitions";
private static final String TOPIC_KEY_NAME = "topic";
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
// Possible error codes:
// CorruptRecord
// InvalidProducerEpoch
// UnknownTopicOrPartition
// NotLeaderForPartition
// MessageTooLarge
// RecordListTooLarge
// NotEnoughReplicas
// NotEnoughReplicasAfterAppend
// InvalidRequiredAcks
private final Map<Long, Map<TopicPartition, Errors>> errors;
public WriteTxnMarkersResponse(Map<Long, Map<TopicPartition, Errors>> errors) {
this.errors = errors;
}
public WriteTxnMarkersResponse(Struct struct) {
Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>();
Object[] responseArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME);
for (Object responseObj : responseArray) {
Struct responseStruct = (Struct) responseObj;
long producerId = responseStruct.getLong(PID_KEY_NAME);
Map<TopicPartition, Errors> errorPerPartition = new HashMap<>();
Object[] topicPartitionsArray = responseStruct.getArray(TOPIC_PARTITIONS_KEY_NAME);
for (Object topicPartitionObj : topicPartitionsArray) {
Struct topicPartitionStruct = (Struct) topicPartitionObj;
String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME)) {
Struct partitionStruct = (Struct) partitionObj;
Integer partition = partitionStruct.getInt(PARTITION_KEY_NAME);
Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
errorPerPartition.put(new TopicPartition(topic, partition), error);
}
}
errors.put(producerId, errorPerPartition);
}
this.errors = errors;
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.WRITE_TXN_MARKERS.responseSchema(version));
Object[] responsesArray = new Object[errors.size()];
int k = 0;
for (Map.Entry<Long, Map<TopicPartition, Errors>> responseEntry : errors.entrySet()) {
Struct responseStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
responseStruct.set(PID_KEY_NAME, responseEntry.getKey());
Map<TopicPartition, Errors> partitionAndErrors = responseEntry.getValue();
Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(partitionAndErrors);
Object[] partitionsArray = new Object[mappedPartitions.size()];
int i = 0;
for (Map.Entry<String, Map<Integer, Errors>> topicAndPartitions : mappedPartitions.entrySet()) {
Struct topicPartitionsStruct = responseStruct.instance(TOPIC_PARTITIONS_KEY_NAME);
topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
Map<Integer, Errors> partitionIdAndErrors = topicAndPartitions.getValue();
Object[] partitionAndErrorsArray = new Object[partitionIdAndErrors.size()];
int j = 0;
for (Map.Entry<Integer, Errors> partitionAndError : partitionIdAndErrors.entrySet()) {
Struct partitionAndErrorStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
partitionAndErrorStruct.set(PARTITION_KEY_NAME, partitionAndError.getKey());
partitionAndErrorStruct.set(ERROR_CODE_KEY_NAME, partitionAndError.getValue().code());
partitionAndErrorsArray[j++] = partitionAndErrorStruct;
}
topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionAndErrorsArray);
partitionsArray[i++] = topicPartitionsStruct;
}
responseStruct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
responsesArray[k++] = responseStruct;
}
struct.set(TXN_MARKER_ENTRY_KEY_NAME, responsesArray);
return struct;
}
public Map<TopicPartition, Errors> errors(long producerId) {
return errors.get(producerId);
}
public static WriteTxnMarkersResponse parse(ByteBuffer buffer, short version) {
return new WriteTxnMarkersResponse(ApiKeys.WRITE_TXN_MARKERS.parseResponse(version, buffer));
}
}

View File

@ -30,10 +30,10 @@ import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Utils;
import org.junit.Test;
@ -136,9 +136,27 @@ public class RequestResponseTest {
checkRequest(createDeleteTopicsRequest());
checkErrorResponse(createDeleteTopicsRequest(), new UnknownServerException());
checkResponse(createDeleteTopicsResponse(), 0);
checkRequest(createInitPidRequest());
checkErrorResponse(createInitPidRequest(), new UnknownServerException());
checkResponse(createInitPidResponse(), 0);
checkRequest(createAddPartitionsToTxnRequest());
checkResponse(createAddPartitionsToTxnResponse(), 0);
checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException());
checkRequest(createAddOffsetsToTxnRequest());
checkResponse(createAddOffsetsToTxnResponse(), 0);
checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException());
checkRequest(createEndTxnRequest());
checkResponse(createEndTxnResponse(), 0);
checkErrorResponse(createEndTxnRequest(), new UnknownServerException());
checkRequest(createWriteTxnMarkersRequest());
checkResponse(createWriteTxnMarkersResponse(), 0);
checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException());
checkRequest(createTxnOffsetCommitRequest());
checkResponse(createTxnOffsetCommitResponse(), 0);
checkErrorResponse(createTxnOffsetCommitRequest(), new UnknownServerException());
checkOlderFetchVersions();
checkResponse(createMetadataResponse(), 0);
checkResponse(createMetadataResponse(), 1);
@ -166,6 +184,21 @@ public class RequestResponseTest {
checkRequest(createLeaderEpochRequest());
checkResponse(createLeaderEpochResponse(), 0);
checkErrorResponse(createLeaderEpochRequest(), new UnknownServerException());
checkRequest(createAddPartitionsToTxnRequest());
checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException());
checkResponse(createAddPartitionsToTxnResponse(), 0);
checkRequest(createAddOffsetsToTxnRequest());
checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException());
checkResponse(createAddOffsetsToTxnResponse(), 0);
checkRequest(createEndTxnRequest());
checkErrorResponse(createEndTxnRequest(), new UnknownServerException());
checkResponse(createEndTxnResponse(), 0);
checkRequest(createWriteTxnMarkersRequest());
checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException());
checkResponse(createWriteTxnMarkersResponse(), 0);
checkRequest(createTxnOffsetCommitRequest());
checkErrorResponse(createTxnOffsetCommitRequest(), new UnknownServerException());
checkResponse(createTxnOffsetCommitResponse(), 0);
}
@Test
@ -821,6 +854,58 @@ public class RequestResponseTest {
return new OffsetsForLeaderEpochResponse(epochs);
}
private AddPartitionsToTxnRequest createAddPartitionsToTxnRequest() {
return new AddPartitionsToTxnRequest.Builder("tid", 21L, (short) 42,
Collections.singletonList(new TopicPartition("topic", 73))).build();
}
private AddPartitionsToTxnResponse createAddPartitionsToTxnResponse() {
return new AddPartitionsToTxnResponse(Errors.NONE);
}
private AddOffsetsToTxnRequest createAddOffsetsToTxnRequest() {
return new AddOffsetsToTxnRequest.Builder("tid", 21L, (short) 42, "gid").build();
}
private AddOffsetsToTxnResponse createAddOffsetsToTxnResponse() {
return new AddOffsetsToTxnResponse(Errors.NONE);
}
private EndTxnRequest createEndTxnRequest() {
return new EndTxnRequest.Builder("tid", 21L, (short) 42, TransactionResult.COMMIT).build();
}
private EndTxnResponse createEndTxnResponse() {
return new EndTxnResponse(Errors.NONE);
}
private WriteTxnMarkersRequest createWriteTxnMarkersRequest() {
return new WriteTxnMarkersRequest.Builder(73,
Collections.singletonList(new WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short) 42, TransactionResult.ABORT,
Collections.singletonList(new TopicPartition("topic", 73))))).build();
}
private WriteTxnMarkersResponse createWriteTxnMarkersResponse() {
final Map<TopicPartition, Errors> errorPerPartitions = new HashMap<>();
errorPerPartitions.put(new TopicPartition("topic", 73), Errors.NONE);
final Map<Long, Map<TopicPartition, Errors>> response = new HashMap<>();
response.put(21L, errorPerPartitions);
return new WriteTxnMarkersResponse(response);
}
private TxnOffsetCommitRequest createTxnOffsetCommitRequest() {
final Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> offsets = new HashMap<>();
offsets.put(new TopicPartition("topic", 73),
new TxnOffsetCommitRequest.CommittedOffset(100, null));
return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, 73, offsets).build();
}
private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {
final Map<TopicPartition, Errors> errorPerPartitions = new HashMap<>();
errorPerPartitions.put(new TopicPartition("topic", 73), Errors.NONE);
return new TxnOffsetCommitResponse(errorPerPartitions);
}
private static class ByteBufferChannel implements GatheringByteChannel {
private final ByteBuffer buf;
private boolean closed = false;

View File

@ -31,7 +31,7 @@ import kafka.controller.KafkaController
import kafka.coordinator.{GroupCoordinator, InitPidResult, JoinGroupResult, TransactionCoordinator}
import kafka.log._
import kafka.network._
import kafka.network.RequestChannel.{Response, Session}
import kafka.network.RequestChannel.{Request, Response, Session}
import kafka.security.auth
import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Write}
import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
@ -40,7 +40,7 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
import org.apache.kafka.common.record.{RecordBatch, MemoryRecords, TimestampType}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, TimestampType}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
@ -104,7 +104,11 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN => handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
}
} catch {
case e: FatalExitError => throw e
@ -1323,6 +1327,28 @@ class KafkaApis(val requestChannel: RequestChannel,
txnCoordinator.handleInitPid(initPidRequest.transactionalId, sendResponseCallback)
}
def handleEndTxnRequest(request: Request): Unit = {
requestChannel.sendResponse(new RequestChannel.Response(request, new EndTxnResponse(Errors.UNSUPPORTED_VERSION)))
}
def handleAddPartitionToTxnRequest(request: Request): Unit = {
requestChannel.sendResponse(new RequestChannel.Response(request, new AddPartitionsToTxnResponse(Errors.UNSUPPORTED_VERSION)))
}
def handleAddOffsetsToTxnRequest(request: Request): Unit = {
requestChannel.sendResponse(new RequestChannel.Response(request, new AddOffsetsToTxnResponse(Errors.UNSUPPORTED_VERSION)))
}
def handleWriteTxnMarkersRequest(request: Request): Unit = {
val emptyResponse = new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()
requestChannel.sendResponse(new RequestChannel.Response(request, new WriteTxnMarkersResponse(emptyResponse)))
}
def handleTxnOffsetCommitRequest(request: Request): Unit = {
val emptyResponse = new java.util.HashMap[TopicPartition, Errors]()
requestChannel.sendResponse(new RequestChannel.Response(request, new TxnOffsetCommitResponse(emptyResponse)))
}
def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
val offsetForEpoch = request.body[OffsetsForLeaderEpochRequest]
val requestInfo = offsetForEpoch.epochsByTopicPartition()