KAFKA-8590; Use automated TxnOffsetCommit type and add tests for OffsetCommit (#6994)

This PR changes the TxnOffsetCommit protocol to auto-generated types, and add more unit test coverage to the plain OffsetCommit protocol.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Boyang Chen 2019-09-05 23:07:42 -07:00 committed by Jason Gustafson
parent ad3ccf8f31
commit c0019e6538
18 changed files with 823 additions and 347 deletions

View File

@ -65,7 +65,7 @@
files="(Sender|Fetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message)Test.java"/>
<suppress checks="ClassFanOutComplexity"
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient)Test.java"/>
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient|Message)Test.java"/>
<suppress checks="ClassFanOutComplexity"
files="MockAdminClient.java"/>

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
@ -986,8 +987,14 @@ public class TransactionManager {
offsetAndMetadata.metadata(), offsetAndMetadata.leaderEpoch());
pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
}
TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(transactionalId, consumerGroupId,
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, pendingTxnOffsetCommits);
TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(
new TxnOffsetCommitRequestData()
.setTransactionalId(transactionalId)
.setGroupId(consumerGroupId)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch)
.setTopics(TxnOffsetCommitRequest.getTopics(pendingTxnOffsetCommits))
);
return new TxnOffsetCommitHandler(result, builder);
}
@ -1432,7 +1439,7 @@ public class TransactionManager {
@Override
String coordinatorKey() {
return builder.consumerGroupId();
return builder.data.groupId();
}
@Override
@ -1441,7 +1448,7 @@ public class TransactionManager {
boolean coordinatorReloaded = false;
Map<TopicPartition, Errors> errors = txnOffsetCommitResponse.errors();
log.debug("Received TxnOffsetCommit response for consumer group {}: {}", builder.consumerGroupId(),
log.debug("Received TxnOffsetCommit response for consumer group {}: {}", builder.data.groupId(),
errors);
for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) {
@ -1454,14 +1461,14 @@ public class TransactionManager {
|| error == Errors.REQUEST_TIMED_OUT) {
if (!coordinatorReloaded) {
coordinatorReloaded = true;
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, builder.consumerGroupId());
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, builder.data.groupId());
}
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION
|| error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
// If the topic is unknown or the coordinator is loading, retry with the current coordinator
continue;
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(GroupAuthorizationException.forGroupId(builder.consumerGroupId()));
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
break;
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
|| error == Errors.INVALID_PRODUCER_EPOCH

View File

@ -66,6 +66,8 @@ import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
@ -109,8 +111,6 @@ import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.StopReplicaRequest;
import org.apache.kafka.common.requests.StopReplicaResponse;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.requests.UpdateMetadataResponse;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
@ -172,8 +172,8 @@ public enum ApiKeys {
EndTxnResponse.schemaVersions()),
WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequest.schemaVersions(),
WriteTxnMarkersResponse.schemaVersions()),
TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequest.schemaVersions(),
TxnOffsetCommitResponse.schemaVersions()),
TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequestData.SCHEMAS,
TxnOffsetCommitResponseData.SCHEMAS),
DESCRIBE_ACLS(29, "DescribeAcls", DescribeAclsRequest.schemaVersions(), DescribeAclsResponse.schemaVersions()),
CREATE_ACLS(30, "CreateAcls", CreateAclsRequest.schemaVersions(), CreateAclsResponse.schemaVersions()),
DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequest.schemaVersions(), DeleteAclsResponse.schemaVersions()),

View File

@ -127,7 +127,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case WRITE_TXN_MARKERS:
return new WriteTxnMarkersResponse(struct);
case TXN_OFFSET_COMMIT:
return new TxnOffsetCommitResponse(struct);
return new TxnOffsetCommitResponse(struct, version);
case DESCRIBE_ACLS:
return new DescribeAclsResponse(struct);
case CREATE_ACLS:

View File

@ -19,7 +19,10 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@ -88,7 +91,7 @@ public class OffsetCommitRequest extends AbstractRequest {
public Map<TopicPartition, Long> offsets() {
Map<TopicPartition, Long> offsets = new HashMap<>();
for (OffsetCommitRequestData.OffsetCommitRequestTopic topic : data.topics()) {
for (OffsetCommitRequestTopic topic : data.topics()) {
for (OffsetCommitRequestData.OffsetCommitRequestPartition partition : topic.partitions()) {
offsets.put(new TopicPartition(topic.name(), partition.partitionIndex()),
partition.committedOffset());
@ -97,20 +100,19 @@ public class OffsetCommitRequest extends AbstractRequest {
return offsets;
}
public static List<OffsetCommitResponseData.OffsetCommitResponseTopic> getErrorResponseTopics(
List<OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopics,
public static List<OffsetCommitResponseTopic> getErrorResponseTopics(
List<OffsetCommitRequestTopic> requestTopics,
Errors e) {
List<OffsetCommitResponseData.OffsetCommitResponseTopic>
responseTopicData = new ArrayList<>();
for (OffsetCommitRequestData.OffsetCommitRequestTopic entry : requestTopics) {
List<OffsetCommitResponseData.OffsetCommitResponsePartition> responsePartitions =
List<OffsetCommitResponseTopic> responseTopicData = new ArrayList<>();
for (OffsetCommitRequestTopic entry : requestTopics) {
List<OffsetCommitResponsePartition> responsePartitions =
new ArrayList<>();
for (OffsetCommitRequestData.OffsetCommitRequestPartition requestPartition : entry.partitions()) {
responsePartitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setPartitionIndex(requestPartition.partitionIndex())
.setErrorCode(e.code()));
responsePartitions.add(new OffsetCommitResponsePartition()
.setPartitionIndex(requestPartition.partitionIndex())
.setErrorCode(e.code()));
}
responseTopicData.add(new OffsetCommitResponseData.OffsetCommitResponseTopic()
responseTopicData.add(new OffsetCommitResponseTopic()
.setName(entry.name())
.setPartitions(responsePartitions)
);
@ -119,8 +121,8 @@ public class OffsetCommitRequest extends AbstractRequest {
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<OffsetCommitResponseData.OffsetCommitResponseTopic>
public OffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<OffsetCommitResponseTopic>
responseTopicData = getErrorResponseTopics(data.topics(), Errors.forException(e));
short versionId = version();

View File

@ -18,6 +18,8 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@ -30,18 +32,18 @@ import java.util.Map;
/**
* Possible error codes:
*
* UNKNOWN_TOPIC_OR_PARTITION (3)
* REQUEST_TIMED_OUT (7)
* OFFSET_METADATA_TOO_LARGE (12)
* COORDINATOR_LOAD_IN_PROGRESS (14)
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
* REBALANCE_IN_PROGRESS (27)
* INVALID_COMMIT_OFFSET_SIZE (28)
* TOPIC_AUTHORIZATION_FAILED (29)
* GROUP_AUTHORIZATION_FAILED (30)
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
* - {@link Errors#REQUEST_TIMED_OUT}
* - {@link Errors#OFFSET_METADATA_TOO_LARGE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#ILLEGAL_GENERATION}
* - {@link Errors#UNKNOWN_MEMBER_ID}
* - {@link Errors#REBALANCE_IN_PROGRESS}
* - {@link Errors#INVALID_COMMIT_OFFSET_SIZE}
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
*/
public class OffsetCommitResponse extends AbstractResponse {
@ -52,23 +54,19 @@ public class OffsetCommitResponse extends AbstractResponse {
}
public OffsetCommitResponse(int requestThrottleMs, Map<TopicPartition, Errors> responseData) {
Map<String, OffsetCommitResponseData.OffsetCommitResponseTopic>
Map<String, OffsetCommitResponseTopic>
responseTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, Errors> entry : responseData.entrySet()) {
TopicPartition topicPartition = entry.getKey();
String topicName = topicPartition.topic();
OffsetCommitResponseData.OffsetCommitResponseTopic topic = responseTopicDataMap
.getOrDefault(topicName, new OffsetCommitResponseData.OffsetCommitResponseTopic());
OffsetCommitResponseTopic topic = responseTopicDataMap.getOrDefault(
topicName, new OffsetCommitResponseTopic().setName(topicName));
if (topic.name().equals("")) {
topic.setName(topicName);
}
topic.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
.setErrorCode(entry.getValue().code())
.setPartitionIndex(topicPartition.partition())
);
topic.partitions().add(new OffsetCommitResponsePartition()
.setErrorCode(entry.getValue().code())
.setPartitionIndex(topicPartition.partition()));
responseTopicDataMap.put(topicName, topic);
}
@ -97,8 +95,8 @@ public class OffsetCommitResponse extends AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
Map<TopicPartition, Errors> errorMap = new HashMap<>();
for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : data.topics()) {
for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
for (OffsetCommitResponseTopic topic : data.topics()) {
for (OffsetCommitResponsePartition partition : topic.partitions()) {
errorMap.put(new TopicPartition(topic.name(), partition.partitionIndex()),
Errors.forCode(partition.errorCode()));
}

View File

@ -17,225 +17,129 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_METADATA;
import static org.apache.kafka.common.protocol.CommonFields.COMMITTED_OFFSET;
import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
import java.util.stream.Collectors;
public class TxnOffsetCommitRequest extends AbstractRequest {
// top level fields
private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
"Topics to commit offsets");
// topic level fields
private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
"Partitions to commit offsets");
private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
PARTITION_ID,
COMMITTED_OFFSET,
COMMITTED_METADATA);
private static final Field TOPICS_V0 = TOPICS.withFields(
TOPIC_NAME,
PARTITIONS_V0);
private static final Schema TXN_OFFSET_COMMIT_REQUEST_V0 = new Schema(
TRANSACTIONAL_ID,
GROUP_ID,
PRODUCER_ID,
PRODUCER_EPOCH,
TOPICS_V0);
// V1 bump used to indicate that on quota violation brokers send out responses before throttling.
private static final Schema TXN_OFFSET_COMMIT_REQUEST_V1 = TXN_OFFSET_COMMIT_REQUEST_V0;
// V2 adds the leader epoch to the partition data
private static final Field PARTITIONS_V2 = PARTITIONS.withFields(
PARTITION_ID,
COMMITTED_OFFSET,
COMMITTED_LEADER_EPOCH,
COMMITTED_METADATA);
private static final Field TOPICS_V2 = TOPICS.withFields(
TOPIC_NAME,
PARTITIONS_V2);
private static final Schema TXN_OFFSET_COMMIT_REQUEST_V2 = new Schema(
TRANSACTIONAL_ID,
GROUP_ID,
PRODUCER_ID,
PRODUCER_EPOCH,
TOPICS_V2);
public static Schema[] schemaVersions() {
return new Schema[]{TXN_OFFSET_COMMIT_REQUEST_V0, TXN_OFFSET_COMMIT_REQUEST_V1, TXN_OFFSET_COMMIT_REQUEST_V2};
}
public final TxnOffsetCommitRequestData data;
public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitRequest> {
private final String transactionalId;
private final String consumerGroupId;
private final long producerId;
private final short producerEpoch;
private final Map<TopicPartition, CommittedOffset> offsets;
public Builder(String transactionalId, String consumerGroupId, long producerId, short producerEpoch,
Map<TopicPartition, CommittedOffset> offsets) {
public final TxnOffsetCommitRequestData data;
public Builder(TxnOffsetCommitRequestData data) {
super(ApiKeys.TXN_OFFSET_COMMIT);
this.transactionalId = transactionalId;
this.consumerGroupId = consumerGroupId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.offsets = offsets;
}
public String consumerGroupId() {
return consumerGroupId;
}
public Map<TopicPartition, CommittedOffset> offsets() {
return offsets;
this.data = data;
}
@Override
public TxnOffsetCommitRequest build(short version) {
return new TxnOffsetCommitRequest(version, transactionalId, consumerGroupId, producerId, producerEpoch, offsets);
return new TxnOffsetCommitRequest(data, version);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=TxnOffsetCommitRequest").
append(", transactionalId=").append(transactionalId).
append(", producerId=").append(producerId).
append(", producerEpoch=").append(producerEpoch).
append(", consumerGroupId=").append(consumerGroupId).
append(", offsets=").append(offsets).
append(")");
return bld.toString();
return data.toString();
}
}
private final String transactionalId;
private final String consumerGroupId;
private final long producerId;
private final short producerEpoch;
private final Map<TopicPartition, CommittedOffset> offsets;
public TxnOffsetCommitRequest(short version, String transactionalId, String consumerGroupId, long producerId,
short producerEpoch, Map<TopicPartition, CommittedOffset> offsets) {
public TxnOffsetCommitRequest(TxnOffsetCommitRequestData data, short version) {
super(ApiKeys.TXN_OFFSET_COMMIT, version);
this.transactionalId = transactionalId;
this.consumerGroupId = consumerGroupId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.offsets = offsets;
this.data = data;
}
public TxnOffsetCommitRequest(Struct struct, short version) {
super(ApiKeys.TXN_OFFSET_COMMIT, version);
this.transactionalId = struct.get(TRANSACTIONAL_ID);
this.consumerGroupId = struct.get(GROUP_ID);
this.producerId = struct.get(PRODUCER_ID);
this.producerEpoch = struct.get(PRODUCER_EPOCH);
Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
Object[] topicPartitionsArray = struct.get(TOPICS);
for (Object topicPartitionObj : topicPartitionsArray) {
Struct topicPartitionStruct = (Struct) topicPartitionObj;
String topic = topicPartitionStruct.get(TOPIC_NAME);
for (Object partitionObj : topicPartitionStruct.get(PARTITIONS)) {
Struct partitionStruct = (Struct) partitionObj;
TopicPartition partition = new TopicPartition(topic, partitionStruct.get(PARTITION_ID));
long offset = partitionStruct.get(COMMITTED_OFFSET);
String metadata = partitionStruct.get(COMMITTED_METADATA);
Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionStruct, COMMITTED_LEADER_EPOCH);
offsets.put(partition, new CommittedOffset(offset, metadata, leaderEpoch));
}
}
this.offsets = offsets;
}
public String transactionalId() {
return transactionalId;
}
public String consumerGroupId() {
return consumerGroupId;
}
public long producerId() {
return producerId;
}
public short producerEpoch() {
return producerEpoch;
this.data = new TxnOffsetCommitRequestData(struct, version);
}
public Map<TopicPartition, CommittedOffset> offsets() {
return offsets;
List<TxnOffsetCommitRequestTopic> topics = data.topics();
Map<TopicPartition, CommittedOffset> offsetMap = new HashMap<>();
for (TxnOffsetCommitRequestTopic topic : topics) {
for (TxnOffsetCommitRequestPartition partition : topic.partitions()) {
offsetMap.put(new TopicPartition(topic.name(), partition.partitionIndex()),
new CommittedOffset(partition.committedOffset(),
partition.committedMetadata(),
RequestUtils.getLeaderEpoch(partition.committedLeaderEpoch()))
);
}
}
return offsetMap;
}
public static List<TxnOffsetCommitRequestTopic> getTopics(Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits) {
Map<String, List<TxnOffsetCommitRequestPartition>> topicPartitionMap = new HashMap<>();
for (Map.Entry<TopicPartition, CommittedOffset> entry : pendingTxnOffsetCommits.entrySet()) {
TopicPartition topicPartition = entry.getKey();
CommittedOffset offset = entry.getValue();
List<TxnOffsetCommitRequestPartition> partitions =
topicPartitionMap.getOrDefault(topicPartition.topic(), new ArrayList<>());
partitions.add(new TxnOffsetCommitRequestPartition()
.setPartitionIndex(topicPartition.partition())
.setCommittedOffset(offset.offset)
.setCommittedLeaderEpoch(offset.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setCommittedMetadata(offset.metadata)
);
topicPartitionMap.put(topicPartition.topic(), partitions);
}
return topicPartitionMap.entrySet().stream()
.map(entry -> new TxnOffsetCommitRequestTopic()
.setName(entry.getKey())
.setPartitions(entry.getValue()))
.collect(Collectors.toList());
}
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version()));
struct.set(TRANSACTIONAL_ID, transactionalId);
struct.set(GROUP_ID, consumerGroupId);
struct.set(PRODUCER_ID, producerId);
struct.set(PRODUCER_EPOCH, producerEpoch);
return data.toStruct(version());
}
Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = CollectionUtils.groupPartitionDataByTopic(offsets);
Object[] partitionsArray = new Object[mappedPartitionOffsets.size()];
int i = 0;
for (Map.Entry<String, Map<Integer, CommittedOffset>> topicAndPartitions : mappedPartitionOffsets.entrySet()) {
Struct topicPartitionsStruct = struct.instance(TOPICS);
topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
Map<Integer, CommittedOffset> partitionOffsets = topicAndPartitions.getValue();
Object[] partitionOffsetsArray = new Object[partitionOffsets.size()];
int j = 0;
for (Map.Entry<Integer, CommittedOffset> partitionOffset : partitionOffsets.entrySet()) {
Struct partitionOffsetStruct = topicPartitionsStruct.instance(PARTITIONS);
partitionOffsetStruct.set(PARTITION_ID, partitionOffset.getKey());
CommittedOffset committedOffset = partitionOffset.getValue();
partitionOffsetStruct.set(COMMITTED_OFFSET, committedOffset.offset);
partitionOffsetStruct.set(COMMITTED_METADATA, committedOffset.metadata);
RequestUtils.setLeaderEpochIfExists(partitionOffsetStruct, COMMITTED_LEADER_EPOCH,
committedOffset.leaderEpoch);
partitionOffsetsArray[j++] = partitionOffsetStruct;
static List<TxnOffsetCommitResponseTopic> getErrorResponseTopics(List<TxnOffsetCommitRequestTopic> requestTopics,
Errors e) {
List<TxnOffsetCommitResponseTopic> responseTopicData = new ArrayList<>();
for (TxnOffsetCommitRequestTopic entry : requestTopics) {
List<TxnOffsetCommitResponsePartition> responsePartitions = new ArrayList<>();
for (TxnOffsetCommitRequestPartition requestPartition : entry.partitions()) {
responsePartitions.add(new TxnOffsetCommitResponsePartition()
.setPartitionIndex(requestPartition.partitionIndex())
.setErrorCode(e.code()));
}
topicPartitionsStruct.set(PARTITIONS, partitionOffsetsArray);
partitionsArray[i++] = topicPartitionsStruct;
responseTopicData.add(new TxnOffsetCommitResponseTopic()
.setName(entry.name())
.setPartitions(responsePartitions)
);
}
struct.set(TOPICS, partitionsArray);
return struct;
return responseTopicData;
}
@Override
public TxnOffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
Map<TopicPartition, Errors> errors = new HashMap<>(offsets.size());
for (TopicPartition partition : offsets.keySet())
errors.put(partition, error);
return new TxnOffsetCommitResponse(throttleTimeMs, errors);
List<TxnOffsetCommitResponseTopic> responseTopicData =
getErrorResponseTopics(data.topics(), Errors.forException(e));
return new TxnOffsetCommitResponse(new TxnOffsetCommitResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setTopics(responseTopicData));
}
public static TxnOffsetCommitRequest parse(ByteBuffer buffer, short version) {
@ -260,6 +164,22 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
", leaderEpoch=" + leaderEpoch +
", metadata='" + metadata + "')";
}
}
@Override
public boolean equals(Object other) {
if (!(other instanceof CommittedOffset)) {
return false;
}
CommittedOffset otherOffset = (CommittedOffset) other;
return this.offset == otherOffset.offset
&& this.leaderEpoch.equals(otherOffset.leaderEpoch)
&& Objects.equals(this.metadata, otherOffset.metadata);
}
@Override
public int hashCode() {
return Objects.hash(offset, leaderEpoch, metadata);
}
}
}

View File

@ -17,142 +17,98 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
/**
*
* Possible error codes:
* InvalidProducerEpoch
* NotCoordinator
* CoordinatorNotAvailable
* CoordinatorLoadInProgress
* OffsetMetadataTooLarge
* GroupAuthorizationFailed
* InvalidCommitOffsetSize
* TransactionalIdAuthorizationFailed
* RequestTimedOut
*
* - {@link Errors#INVALID_PRODUCER_EPOCH}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#OFFSET_METADATA_TOO_LARGE}
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#INVALID_COMMIT_OFFSET_SIZE}
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
* - {@link Errors#REQUEST_TIMED_OUT}
*/
public class TxnOffsetCommitResponse extends AbstractResponse {
private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
"Responses by topic for committed offsets");
// topic level fields
private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
"Responses by partition for committed offsets");
public final TxnOffsetCommitResponseData data;
private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
PARTITION_ID,
ERROR_CODE);
private static final Field TOPICS_V0 = TOPICS.withFields(
TOPIC_NAME,
PARTITIONS_V0);
private static final Schema TXN_OFFSET_COMMIT_RESPONSE_V0 = new Schema(
THROTTLE_TIME_MS,
TOPICS_V0);
// V1 bump used to indicate that on quota violation brokers send out responses before throttling.
private static final Schema TXN_OFFSET_COMMIT_RESPONSE_V1 = TXN_OFFSET_COMMIT_RESPONSE_V0;
// V2 adds the leader epoch to the partition data
private static final Schema TXN_OFFSET_COMMIT_RESPONSE_V2 = TXN_OFFSET_COMMIT_RESPONSE_V1;
public static Schema[] schemaVersions() {
return new Schema[]{TXN_OFFSET_COMMIT_RESPONSE_V0, TXN_OFFSET_COMMIT_RESPONSE_V1, TXN_OFFSET_COMMIT_RESPONSE_V2};
public TxnOffsetCommitResponse(TxnOffsetCommitResponseData data) {
this.data = data;
}
private final Map<TopicPartition, Errors> errors;
private final int throttleTimeMs;
public TxnOffsetCommitResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) {
this.throttleTimeMs = throttleTimeMs;
this.errors = errors;
public TxnOffsetCommitResponse(Struct struct, short version) {
this.data = new TxnOffsetCommitResponseData(struct, version);
}
public TxnOffsetCommitResponse(Struct struct) {
this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
Map<TopicPartition, Errors> errors = new HashMap<>();
Object[] topicPartitionsArray = struct.get(TOPICS);
for (Object topicPartitionObj : topicPartitionsArray) {
Struct topicPartitionStruct = (Struct) topicPartitionObj;
String topic = topicPartitionStruct.get(TOPIC_NAME);
for (Object partitionObj : topicPartitionStruct.get(PARTITIONS)) {
Struct partitionStruct = (Struct) partitionObj;
Integer partition = partitionStruct.get(PARTITION_ID);
Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
errors.put(new TopicPartition(topic, partition), error);
}
public TxnOffsetCommitResponse(int requestThrottleMs, Map<TopicPartition, Errors> responseData) {
Map<String, TxnOffsetCommitResponseTopic> responseTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, Errors> entry : responseData.entrySet()) {
TopicPartition topicPartition = entry.getKey();
String topicName = topicPartition.topic();
TxnOffsetCommitResponseTopic topic = responseTopicDataMap.getOrDefault(
topicName, new TxnOffsetCommitResponseTopic().setName(topicName));
topic.partitions().add(new TxnOffsetCommitResponsePartition()
.setErrorCode(entry.getValue().code())
.setPartitionIndex(topicPartition.partition())
);
responseTopicDataMap.put(topicName, topic);
}
this.errors = errors;
data = new TxnOffsetCommitResponseData()
.setTopics(new ArrayList<>(responseTopicDataMap.values()))
.setThrottleTimeMs(requestThrottleMs);
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.responseSchema(version));
struct.set(THROTTLE_TIME_MS, throttleTimeMs);
Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupPartitionDataByTopic(errors);
Object[] partitionsArray = new Object[mappedPartitions.size()];
int i = 0;
for (Map.Entry<String, Map<Integer, Errors>> topicAndPartitions : mappedPartitions.entrySet()) {
Struct topicPartitionsStruct = struct.instance(TOPICS);
topicPartitionsStruct.set(TOPIC_NAME, topicAndPartitions.getKey());
Map<Integer, Errors> partitionAndErrors = topicAndPartitions.getValue();
Object[] partitionAndErrorsArray = new Object[partitionAndErrors.size()];
int j = 0;
for (Map.Entry<Integer, Errors> partitionAndError : partitionAndErrors.entrySet()) {
Struct partitionAndErrorStruct = topicPartitionsStruct.instance(PARTITIONS);
partitionAndErrorStruct.set(PARTITION_ID, partitionAndError.getKey());
partitionAndErrorStruct.set(ERROR_CODE, partitionAndError.getValue().code());
partitionAndErrorsArray[j++] = partitionAndErrorStruct;
}
topicPartitionsStruct.set(PARTITIONS, partitionAndErrorsArray);
partitionsArray[i++] = topicPartitionsStruct;
}
struct.set(TOPICS, partitionsArray);
return struct;
return data.toStruct(version);
}
@Override
public int throttleTimeMs() {
return throttleTimeMs;
}
public Map<TopicPartition, Errors> errors() {
return errors;
return data.throttleTimeMs();
}
@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(errors);
return errorCounts(errors());
}
public Map<TopicPartition, Errors> errors() {
Map<TopicPartition, Errors> errorMap = new HashMap<>();
for (TxnOffsetCommitResponseTopic topic : data.topics()) {
for (TxnOffsetCommitResponsePartition partition : topic.partitions()) {
errorMap.put(new TopicPartition(topic.name(), partition.partitionIndex()),
Errors.forCode(partition.errorCode()));
}
}
return errorMap;
}
public static TxnOffsetCommitResponse parse(ByteBuffer buffer, short version) {
return new TxnOffsetCommitResponse(ApiKeys.TXN_OFFSET_COMMIT.parseResponse(version, buffer));
return new TxnOffsetCommitResponse(ApiKeys.TXN_OFFSET_COMMIT.parseResponse(version, buffer), version);
}
@Override
public String toString() {
return "TxnOffsetCommitResponse(" +
"errors=" + errors +
", throttleTimeMs=" + throttleTimeMs +
')';
return data.toString();
}
@Override

View File

@ -2826,9 +2826,9 @@ public class TransactionManagerTest {
Map<TopicPartition, Errors> txnOffsetCommitResponse) {
client.prepareResponse(request -> {
TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) request;
assertEquals(consumerGroupId, txnOffsetCommitRequest.consumerGroupId());
assertEquals(producerId, txnOffsetCommitRequest.producerId());
assertEquals(producerEpoch, txnOffsetCommitRequest.producerEpoch());
assertEquals(consumerGroupId, txnOffsetCommitRequest.data.groupId());
assertEquals(producerId, txnOffsetCommitRequest.data.producerId());
assertEquals(producerEpoch, txnOffsetCommitRequest.data.producerEpoch());
return true;
}, new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
}

View File

@ -22,9 +22,17 @@ import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitio
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopic;
import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponsePartition;
import org.apache.kafka.common.message.OffsetFetchResponseData.OffsetFetchResponseTopic;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
@ -271,6 +279,176 @@ public final class MessageTest {
testAllMessageRoundTripsFromVersion((short) 3, new LeaderAndIsrRequestData().setTopicStates(Collections.singletonList(partitionStateWithAddingRemovingReplicas)));
}
@Test
public void testOffsetCommitRequestVersions() throws Exception {
String groupId = "groupId";
String topicName = "topic";
String metadata = "metadata";
int partition = 2;
int offset = 100;
testAllMessageRoundTrips(new OffsetCommitRequestData()
.setGroupId(groupId)
.setTopics(Collections.singletonList(
new OffsetCommitRequestTopic()
.setName(topicName)
.setPartitions(Collections.singletonList(
new OffsetCommitRequestPartition()
.setPartitionIndex(partition)
.setCommittedMetadata(metadata)
.setCommittedOffset(offset)
)))));
Supplier<OffsetCommitRequestData> request =
() -> new OffsetCommitRequestData()
.setGroupId(groupId)
.setMemberId("memberId")
.setGroupInstanceId("instanceId")
.setTopics(Collections.singletonList(
new OffsetCommitRequestTopic()
.setName(topicName)
.setPartitions(Collections.singletonList(
new OffsetCommitRequestPartition()
.setPartitionIndex(partition)
.setCommittedLeaderEpoch(10)
.setCommittedMetadata(metadata)
.setCommittedOffset(offset)
.setCommitTimestamp(20)
))))
.setRetentionTimeMs(20);
for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) {
OffsetCommitRequestData requestData = request.get();
if (version < 1) {
requestData.setMemberId("");
requestData.setGenerationId(-1);
}
if (version != 1) {
requestData.topics().get(0).partitions().get(0).setCommitTimestamp(-1);
}
if (version < 2 || version > 4) {
requestData.setRetentionTimeMs(-1);
}
if (version < 6) {
requestData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1);
}
if (version < 7) {
requestData.setGroupInstanceId(null);
}
if (version == 1) {
testEquivalentMessageRoundTrip(version, requestData);
} else if (version >= 2 && version <= 4) {
testAllMessageRoundTripsBetweenVersions(version, (short) 4, requestData, requestData);
} else {
testAllMessageRoundTripsFromVersion(version, requestData);
}
}
}
@Test
public void testOffsetCommitResponseVersions() throws Exception {
Supplier<OffsetCommitResponseData> response =
() -> new OffsetCommitResponseData()
.setTopics(
singletonList(
new OffsetCommitResponseTopic()
.setName("topic")
.setPartitions(singletonList(
new OffsetCommitResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
))
)
)
.setThrottleTimeMs(20);
for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) {
OffsetCommitResponseData responseData = response.get();
if (version < 3) {
responseData.setThrottleTimeMs(0);
}
testAllMessageRoundTripsFromVersion(version, responseData);
}
}
@Test
public void testTxnOffsetCommitRequestVersions() throws Exception {
String groupId = "groupId";
String topicName = "topic";
String metadata = "metadata";
String txnId = "transactionalId";
int producerId = 25;
short producerEpoch = 10;
int partition = 2;
int offset = 100;
testAllMessageRoundTrips(new TxnOffsetCommitRequestData()
.setGroupId(groupId)
.setTransactionalId(txnId)
.setProducerId(producerId)
.setProducerEpoch(producerEpoch)
.setTopics(Collections.singletonList(
new TxnOffsetCommitRequestTopic()
.setName(topicName)
.setPartitions(Collections.singletonList(
new TxnOffsetCommitRequestPartition()
.setPartitionIndex(partition)
.setCommittedMetadata(metadata)
.setCommittedOffset(offset)
)))));
Supplier<TxnOffsetCommitRequestData> request =
() -> new TxnOffsetCommitRequestData()
.setGroupId(groupId)
.setTransactionalId(txnId)
.setProducerId(producerId)
.setProducerEpoch(producerEpoch)
.setTopics(Collections.singletonList(
new TxnOffsetCommitRequestTopic()
.setName(topicName)
.setPartitions(Collections.singletonList(
new TxnOffsetCommitRequestPartition()
.setPartitionIndex(partition)
.setCommittedLeaderEpoch(10)
.setCommittedMetadata(metadata)
.setCommittedOffset(offset)
))));
for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) {
TxnOffsetCommitRequestData requestData = request.get();
if (version < 6) {
requestData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1);
}
testAllMessageRoundTripsFromVersion(version, requestData);
}
}
@Test
public void testTxnOffsetCommitResponseVersions() throws Exception {
testAllMessageRoundTrips(
new TxnOffsetCommitResponseData()
.setTopics(
singletonList(
new TxnOffsetCommitResponseTopic()
.setName("topic")
.setPartitions(singletonList(
new TxnOffsetCommitResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
))
)
)
.setThrottleTimeMs(20));
}
@Test
public void testOffsetFetchVersions() throws Exception {
String groupId = "groupId";

View File

@ -16,19 +16,131 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
import static org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponseTopics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
public class OffsetCommitRequestTest {
@Test(expected = UnsupportedVersionException.class)
public void testRequestVersionCompatibilityFailBuild() {
new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId("groupId")
.setMemberId("consumerId")
.setGroupInstanceId("groupInstanceId")
).build((short) 6);
protected static String groupId = "groupId";
protected static String topicOne = "topicOne";
protected static String topicTwo = "topicTwo";
protected static int partitionOne = 1;
protected static int partitionTwo = 2;
protected static long offset = 100L;
protected static short leaderEpoch = 20;
protected static String metadata = "metadata";
protected static int throttleTimeMs = 10;
private static OffsetCommitRequestData data;
private static List<OffsetCommitRequestTopic> topics;
@Before
public void setUp() {
topics = Arrays.asList(
new OffsetCommitRequestTopic()
.setName(topicOne)
.setPartitions(Collections.singletonList(
new OffsetCommitRequestPartition()
.setPartitionIndex(partitionOne)
.setCommittedOffset(offset)
.setCommittedLeaderEpoch(leaderEpoch)
.setCommittedMetadata(metadata)
)),
new OffsetCommitRequestTopic()
.setName(topicTwo)
.setPartitions(Collections.singletonList(
new OffsetCommitRequestPartition()
.setPartitionIndex(partitionTwo)
.setCommittedOffset(offset)
.setCommittedLeaderEpoch(leaderEpoch)
.setCommittedMetadata(metadata)
))
);
data = new OffsetCommitRequestData()
.setGroupId(groupId)
.setTopics(topics);
}
@Test
public void testConstructor() {
Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
expectedOffsets.put(new TopicPartition(topicOne, partitionOne), offset);
expectedOffsets.put(new TopicPartition(topicTwo, partitionTwo), offset);
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data);
for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) {
OffsetCommitRequest request = builder.build(version);
assertEquals(expectedOffsets, request.offsets());
OffsetCommitResponse response = request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception());
assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 2), response.errorCounts());
if (version >= 3) {
assertEquals(throttleTimeMs, response.throttleTimeMs());
} else {
assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
}
}
}
@Test
public void testGetErrorResponseTopics() {
List<OffsetCommitResponseTopic> expectedTopics = Arrays.asList(
new OffsetCommitResponseTopic()
.setName(topicOne)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionOne))),
new OffsetCommitResponseTopic()
.setName(topicTwo)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionTwo)))
);
assertEquals(expectedTopics, getErrorResponseTopics(topics, Errors.UNKNOWN_MEMBER_ID));
}
@Test
public void testVersionSupportForGroupInstanceId() {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId("groupId")
.setMemberId("consumerId")
.setGroupInstanceId("groupInstanceId")
);
for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) {
if (version >= 7) {
builder.build(version);
} else {
final short finalVersion = version;
assertThrows(UnsupportedVersionException.class, () -> builder.build(finalVersion));
}
}
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
import static org.junit.Assert.assertEquals;
public class OffsetCommitResponseTest {
protected final int throttleTimeMs = 10;
protected final String topicOne = "topic1";
protected final int partitionOne = 1;
protected final Errors errorOne = Errors.COORDINATOR_NOT_AVAILABLE;
protected final Errors errorTwo = Errors.NOT_COORDINATOR;
protected final String topicTwo = "topic2";
protected final int partitionTwo = 2;
protected TopicPartition tp1 = new TopicPartition(topicOne, partitionOne);
protected TopicPartition tp2 = new TopicPartition(topicTwo, partitionTwo);
protected Map<Errors, Integer> expectedErrorCounts;
protected Map<TopicPartition, Errors> errorsMap;
@Before
public void setUp() {
expectedErrorCounts = new HashMap<>();
expectedErrorCounts.put(errorOne, 1);
expectedErrorCounts.put(errorTwo, 1);
errorsMap = new HashMap<>();
errorsMap.put(tp1, errorOne);
errorsMap.put(tp2, errorTwo);
}
@Test
public void testConstructorWithErrorResponse() {
OffsetCommitResponse response = new OffsetCommitResponse(throttleTimeMs, errorsMap);
assertEquals(expectedErrorCounts, response.errorCounts());
assertEquals(throttleTimeMs, response.throttleTimeMs());
}
@Test
public void testConstructorWithStruct() {
OffsetCommitResponseData data = new OffsetCommitResponseData()
.setTopics(Arrays.asList(
new OffsetCommitResponseTopic().setPartitions(
Collections.singletonList(new OffsetCommitResponsePartition()
.setPartitionIndex(partitionOne)
.setErrorCode(errorOne.code()))),
new OffsetCommitResponseTopic().setPartitions(
Collections.singletonList(new OffsetCommitResponsePartition()
.setPartitionIndex(partitionTwo)
.setErrorCode(errorTwo.code())))
))
.setThrottleTimeMs(throttleTimeMs);
for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) {
OffsetCommitResponse response = new OffsetCommitResponse(data.toStruct(version), version);
assertEquals(expectedErrorCounts, response.errorCounts());
if (version >= 3) {
assertEquals(throttleTimeMs, response.throttleTimeMs());
} else {
assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
}
assertEquals(version >= 4, response.shouldClientThrottle(version));
}
}
}

View File

@ -88,6 +88,7 @@ import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
@ -1384,7 +1385,14 @@ public class RequestResponseTest {
new TxnOffsetCommitRequest.CommittedOffset(100, null, Optional.empty()));
offsets.put(new TopicPartition("topic", 74),
new TxnOffsetCommitRequest.CommittedOffset(100, "blah", Optional.of(27)));
return new TxnOffsetCommitRequest.Builder("transactionalId", "groupId", 21L, (short) 42, offsets).build();
return new TxnOffsetCommitRequest.Builder(
new TxnOffsetCommitRequestData()
.setTransactionalId("transactionalId")
.setGroupId("groupId")
.setProducerId(21L)
.setProducerEpoch((short) 42)
.setTopics(TxnOffsetCommitRequest.getTopics(offsets))
).build();
}
private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
public class TxnOffsetCommitRequestTest extends OffsetCommitRequestTest {
private static String transactionalId = "transactionalId";
private static int producerId = 10;
private static short producerEpoch = 1;
private static TxnOffsetCommitRequestData data;
@Before
@Override
public void setUp() {
super.setUp();
data = new TxnOffsetCommitRequestData()
.setGroupId(groupId)
.setTransactionalId(transactionalId)
.setProducerId(producerId)
.setProducerEpoch(producerEpoch)
.setTopics(Arrays.asList(
new TxnOffsetCommitRequestTopic()
.setName(topicOne)
.setPartitions(Collections.singletonList(
new TxnOffsetCommitRequestPartition()
.setPartitionIndex(partitionOne)
.setCommittedOffset(offset)
.setCommittedLeaderEpoch(leaderEpoch)
.setCommittedMetadata(metadata)
)),
new TxnOffsetCommitRequestTopic()
.setName(topicTwo)
.setPartitions(Collections.singletonList(
new TxnOffsetCommitRequestPartition()
.setPartitionIndex(partitionTwo)
.setCommittedOffset(offset)
.setCommittedLeaderEpoch(leaderEpoch)
.setCommittedMetadata(metadata)
))
));
}
@Test
@Override
public void testConstructor() {
Map<TopicPartition, CommittedOffset> expectedOffsets = new HashMap<>();
expectedOffsets.put(new TopicPartition(topicOne, partitionOne),
new CommittedOffset(
offset,
metadata,
Optional.of((int) leaderEpoch)));
expectedOffsets.put(new TopicPartition(topicTwo, partitionTwo),
new CommittedOffset(
offset,
metadata,
Optional.of((int) leaderEpoch)));
TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(data);
Map<TopicPartition, Errors> errorsMap = new HashMap<>();
errorsMap.put(new TopicPartition(topicOne, partitionOne), Errors.NOT_COORDINATOR);
errorsMap.put(new TopicPartition(topicTwo, partitionTwo), Errors.NOT_COORDINATOR);
for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) {
TxnOffsetCommitRequest request = builder.build(version);
assertEquals(expectedOffsets, request.offsets());
assertEquals(data.topics(), TxnOffsetCommitRequest.getTopics(request.offsets()));
TxnOffsetCommitResponse response =
request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception());
assertEquals(errorsMap, response.errors());
assertEquals(Collections.singletonMap(Errors.NOT_COORDINATOR, 2), response.errorCounts());
assertEquals(throttleTimeMs, response.throttleTimeMs());
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
public class TxnOffsetCommitResponseTest extends OffsetCommitResponseTest {
@Test
@Override
public void testConstructorWithErrorResponse() {
TxnOffsetCommitResponse response = new TxnOffsetCommitResponse(throttleTimeMs, errorsMap);
assertEquals(errorsMap, response.errors());
assertEquals(expectedErrorCounts, response.errorCounts());
assertEquals(throttleTimeMs, response.throttleTimeMs());
}
@Test
@Override
public void testConstructorWithStruct() {
TxnOffsetCommitResponseData data = new TxnOffsetCommitResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setTopics(Arrays.asList(
new TxnOffsetCommitResponseTopic().setPartitions(
Collections.singletonList(new TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionOne)
.setErrorCode(errorOne.code()))),
new TxnOffsetCommitResponseTopic().setPartitions(
Collections.singletonList(new TxnOffsetCommitResponsePartition()
.setPartitionIndex(partitionTwo)
.setErrorCode(errorTwo.code()))
)
));
for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) {
TxnOffsetCommitResponse response = new TxnOffsetCommitResponse(data.toStruct(version), version);
assertEquals(expectedErrorCounts, response.errorCounts());
assertEquals(throttleTimeMs, response.throttleTimeMs());
assertEquals(version >= 1, response.shouldClientThrottle(version));
}
}
}

View File

@ -422,11 +422,16 @@ class KafkaApis(val requestChannel: RequestChannel,
val metadata = if (partitionData.committedMetadata() == null)
OffsetAndMetadata.NoMetadata
else
partitionData.committedMetadata()
partitionData.committedMetadata
val leaderEpochOpt = if (partitionData.committedLeaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH)
Optional.empty[Integer]
else
Optional.of[Integer](partitionData.committedLeaderEpoch)
k -> new OffsetAndMetadata(
offset = partitionData.committedOffset(),
leaderEpoch = Optional.ofNullable[Integer](partitionData.committedLeaderEpoch),
leaderEpoch = leaderEpochOpt,
metadata = metadata,
commitTimestamp = partitionData.commitTimestamp() match {
case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
@ -2087,9 +2092,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// authorize for the transactionalId and the consumer group. Note that we skip producerId authorization
// since it is implied by transactionalId authorization
if (!authorize(request, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.transactionalId))
if (!authorize(request, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId))
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
else if (!authorize(request, READ, GROUP, txnOffsetCommitRequest.consumerGroupId))
else if (!authorize(request, READ, GROUP, txnOffsetCommitRequest.data.groupId))
sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
else {
val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
@ -2125,9 +2130,9 @@ class KafkaApis(val requestChannel: RequestChannel,
else {
val offsetMetadata = convertTxnOffsets(authorizedTopicCommittedOffsets.toMap)
groupCoordinator.handleTxnCommitOffsets(
txnOffsetCommitRequest.consumerGroupId,
txnOffsetCommitRequest.producerId,
txnOffsetCommitRequest.producerEpoch,
txnOffsetCommitRequest.data.groupId,
txnOffsetCommitRequest.data.producerId,
txnOffsetCommitRequest.data.producerEpoch,
offsetMetadata,
sendResponseCallback)
}

View File

@ -47,7 +47,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.easymock.{Capture, EasyMock, IAnswer}
import EasyMock._
import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData}
import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData, TxnOffsetCommitRequestData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.replica.ClientMetadata
@ -161,8 +161,15 @@ class KafkaApisTest {
val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
val (offsetCommitRequest, request) = buildRequest(new TxnOffsetCommitRequest.Builder("txnlId", "groupId",
15L, 0.toShort, Map(invalidTopicPartition -> partitionOffsetCommitData).asJava))
val (offsetCommitRequest, request) = buildRequest(new TxnOffsetCommitRequest.Builder(
new TxnOffsetCommitRequestData()
.setTransactionalId("txnlId")
.setGroupId("groupId")
.setProducerId(15L)
.setProducerEpoch(0.toShort)
.setTopics(TxnOffsetCommitRequest.getTopics(
Map(invalidTopicPartition -> partitionOffsetCommitData).asJava))
))
val capturedResponse = expectNoThrottling()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)

View File

@ -379,8 +379,16 @@ class RequestQuotaTest extends BaseRequestTest {
new WriteTxnMarkersRequest.Builder(List.empty.asJava)
case ApiKeys.TXN_OFFSET_COMMIT =>
new TxnOffsetCommitRequest.Builder("test-transactional-id", "test-txn-group", 2, 0,
Map.empty[TopicPartition, TxnOffsetCommitRequest.CommittedOffset].asJava)
new TxnOffsetCommitRequest.Builder(
new TxnOffsetCommitRequestData()
.setTransactionalId("test-transactional-id")
.setGroupId("test-txn-group")
.setProducerId(2)
.setProducerEpoch(0)
.setTopics(TxnOffsetCommitRequest.getTopics(
Map.empty[TopicPartition, TxnOffsetCommitRequest.CommittedOffset].asJava
))
)
case ApiKeys.DESCRIBE_ACLS =>
new DescribeAclsRequest.Builder(AclBindingFilter.ANY)
@ -550,7 +558,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs
case ApiKeys.ADD_OFFSETS_TO_TXN => new AddOffsetsToTxnResponse(response).throttleTimeMs
case ApiKeys.END_TXN => new EndTxnResponse(response).throttleTimeMs
case ApiKeys.TXN_OFFSET_COMMIT => new TxnOffsetCommitResponse(response).throttleTimeMs
case ApiKeys.TXN_OFFSET_COMMIT => new TxnOffsetCommitResponse(response, ApiKeys.TXN_OFFSET_COMMIT.latestVersion).throttleTimeMs
case ApiKeys.DESCRIBE_ACLS => new DescribeAclsResponse(response).throttleTimeMs
case ApiKeys.CREATE_ACLS => new CreateAclsResponse(response).throttleTimeMs
case ApiKeys.DELETE_ACLS => new DeleteAclsResponse(response).throttleTimeMs