MINOR: make sure all generated data tests cover all versions (#10078)

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Chia-Ping Tsai 2021-03-05 00:22:57 +08:00 committed by GitHub
parent 1fd75bf1ed
commit be1476869f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 155 additions and 161 deletions

View File

@ -201,7 +201,7 @@ public enum ApiKeys {
public List<Short> allVersions() { public List<Short> allVersions() {
List<Short> versions = new ArrayList<>(latestVersion() - oldestVersion() + 1); List<Short> versions = new ArrayList<>(latestVersion() - oldestVersion() + 1);
for (short version = oldestVersion(); version < latestVersion(); version++) { for (short version = oldestVersion(); version <= latestVersion(); version++) {
versions.add(version); versions.add(version);
} }
return versions; return versions;

View File

@ -186,7 +186,7 @@ public final class MessageTest {
.setPartitions(Collections.singletonList(partition))); .setPartitions(Collections.singletonList(partition)));
Supplier<ListOffsetsResponseData> response = () -> new ListOffsetsResponseData() Supplier<ListOffsetsResponseData> response = () -> new ListOffsetsResponseData()
.setTopics(topics); .setTopics(topics);
for (short version = 0; version <= ApiKeys.LIST_OFFSETS.latestVersion(); version++) { for (short version : ApiKeys.LIST_OFFSETS.allVersions()) {
ListOffsetsResponseData responseData = response.get(); ListOffsetsResponseData responseData = response.get();
if (version > 0) { if (version > 0) {
responseData.topics().get(0).partitions().get(0) responseData.topics().get(0).partitions().get(0)
@ -459,7 +459,7 @@ public final class MessageTest {
)))) ))))
.setRetentionTimeMs(20); .setRetentionTimeMs(20);
for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
OffsetCommitRequestData requestData = request.get(); OffsetCommitRequestData requestData = request.get();
if (version < 1) { if (version < 1) {
requestData.setMemberId(""); requestData.setMemberId("");
@ -485,7 +485,7 @@ public final class MessageTest {
if (version == 1) { if (version == 1) {
testEquivalentMessageRoundTrip(version, requestData); testEquivalentMessageRoundTrip(version, requestData);
} else if (version >= 2 && version <= 4) { } else if (version >= 2 && version <= 4) {
testAllMessageRoundTripsBetweenVersions(version, (short) 4, requestData, requestData); testAllMessageRoundTripsBetweenVersions(version, (short) 5, requestData, requestData);
} else { } else {
testAllMessageRoundTripsFromVersion(version, requestData); testAllMessageRoundTripsFromVersion(version, requestData);
} }
@ -509,7 +509,7 @@ public final class MessageTest {
) )
.setThrottleTimeMs(20); .setThrottleTimeMs(20);
for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
OffsetCommitResponseData responseData = response.get(); OffsetCommitResponseData responseData = response.get();
if (version < 3) { if (version < 3) {
responseData.setThrottleTimeMs(0); responseData.setThrottleTimeMs(0);
@ -568,7 +568,7 @@ public final class MessageTest {
.setCommittedOffset(offset) .setCommittedOffset(offset)
)))); ))));
for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) { for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
TxnOffsetCommitRequestData requestData = request.get(); TxnOffsetCommitRequestData requestData = request.get();
if (version < 2) { if (version < 2) {
requestData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1); requestData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1);
@ -632,7 +632,7 @@ public final class MessageTest {
.setTopics(topics) .setTopics(topics)
.setRequireStable(true); .setRequireStable(true);
for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
final short finalVersion = version; final short finalVersion = version;
if (version < 2) { if (version < 2) {
assertThrows(NullPointerException.class, () -> testAllMessageRoundTripsFromVersion(finalVersion, allPartitionData)); assertThrows(NullPointerException.class, () -> testAllMessageRoundTripsFromVersion(finalVersion, allPartitionData));
@ -661,7 +661,7 @@ public final class MessageTest {
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()))))) .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())))))
.setErrorCode(Errors.NOT_COORDINATOR.code()) .setErrorCode(Errors.NOT_COORDINATOR.code())
.setThrottleTimeMs(10); .setThrottleTimeMs(10);
for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
OffsetFetchResponseData responseData = response.get(); OffsetFetchResponseData responseData = response.get();
if (version <= 1) { if (version <= 1) {
responseData.setErrorCode(Errors.NONE.code()); responseData.setErrorCode(Errors.NONE.code());
@ -720,7 +720,7 @@ public final class MessageTest {
.setErrorMessage(errorMessage)))).iterator())) .setErrorMessage(errorMessage)))).iterator()))
.setThrottleTimeMs(throttleTimeMs); .setThrottleTimeMs(throttleTimeMs);
for (short version = 0; version <= ApiKeys.PRODUCE.latestVersion(); version++) { for (short version : ApiKeys.PRODUCE.allVersions()) {
ProduceResponseData responseData = response.get(); ProduceResponseData responseData = response.get();
if (version < 8) { if (version < 8) {
@ -741,9 +741,9 @@ public final class MessageTest {
} }
if (version >= 3 && version <= 4) { if (version >= 3 && version <= 4) {
testAllMessageRoundTripsBetweenVersions(version, (short) 4, responseData, responseData); testAllMessageRoundTripsBetweenVersions(version, (short) 5, responseData, responseData);
} else if (version >= 6 && version <= 7) { } else if (version >= 6 && version <= 7) {
testAllMessageRoundTripsBetweenVersions(version, (short) 7, responseData, responseData); testAllMessageRoundTripsBetweenVersions(version, (short) 8, responseData, responseData);
} else { } else {
testEquivalentMessageRoundTrip(version, responseData); testEquivalentMessageRoundTrip(version, responseData);
} }
@ -924,8 +924,8 @@ public final class MessageTest {
@Test @Test
public void testWriteNullForNonNullableFieldRaisesException() { public void testWriteNullForNonNullableFieldRaisesException() {
CreateTopicsRequestData createTopics = new CreateTopicsRequestData().setTopics(null); CreateTopicsRequestData createTopics = new CreateTopicsRequestData().setTopics(null);
for (short i = (short) 0; i <= createTopics.highestSupportedVersion(); i++) { for (short version : ApiKeys.CREATE_TOPICS.allVersions()) {
verifyWriteRaisesNpe(i, createTopics); verifyWriteRaisesNpe(version, createTopics);
} }
MetadataRequestData metadata = new MetadataRequestData().setTopics(null); MetadataRequestData metadata = new MetadataRequestData().setTopics(null);
verifyWriteRaisesNpe((short) 0, metadata); verifyWriteRaisesNpe((short) 0, metadata);

View File

@ -43,7 +43,7 @@ public class AddPartitionsToTxnRequestTest {
AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId, producerId, producerEpoch, partitions); AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId, producerId, producerEpoch, partitions);
for (short version = 0; version <= ApiKeys.ADD_PARTITIONS_TO_TXN.latestVersion(); version++) { for (short version : ApiKeys.ADD_PARTITIONS_TO_TXN.allVersions()) {
AddPartitionsToTxnRequest request = builder.build(version); AddPartitionsToTxnRequest request = builder.build(version);
assertEquals(transactionalId, request.data().transactionalId()); assertEquals(transactionalId, request.data().transactionalId());

View File

@ -89,7 +89,7 @@ public class AddPartitionsToTxnResponseTest {
.setThrottleTimeMs(throttleTimeMs); .setThrottleTimeMs(throttleTimeMs);
AddPartitionsToTxnResponse response = new AddPartitionsToTxnResponse(data); AddPartitionsToTxnResponse response = new AddPartitionsToTxnResponse(data);
for (short version = 0; version <= ApiKeys.ADD_PARTITIONS_TO_TXN.latestVersion(); version++) { for (short version : ApiKeys.ADD_PARTITIONS_TO_TXN.allVersions()) {
AddPartitionsToTxnResponse parsedResponse = AddPartitionsToTxnResponse.parse(response.serialize(version), version); AddPartitionsToTxnResponse parsedResponse = AddPartitionsToTxnResponse.parse(response.serialize(version), version);
assertEquals(expectedErrorCounts, parsedResponse.errorCounts()); assertEquals(expectedErrorCounts, parsedResponse.errorCounts());
assertEquals(throttleTimeMs, parsedResponse.throttleTimeMs()); assertEquals(throttleTimeMs, parsedResponse.throttleTimeMs());

View File

@ -38,7 +38,7 @@ public class ControlledShutdownRequestTest {
@Test @Test
public void testGetErrorResponse() { public void testGetErrorResponse() {
for (short version = CONTROLLED_SHUTDOWN.oldestVersion(); version < CONTROLLED_SHUTDOWN.latestVersion(); version++) { for (short version : CONTROLLED_SHUTDOWN.allVersions()) {
ControlledShutdownRequest.Builder builder = new ControlledShutdownRequest.Builder( ControlledShutdownRequest.Builder builder = new ControlledShutdownRequest.Builder(
new ControlledShutdownRequestData().setBrokerId(1), version); new ControlledShutdownRequestData().setBrokerId(1), version);
ControlledShutdownRequest request = builder.build(); ControlledShutdownRequest request = builder.build();

View File

@ -41,7 +41,7 @@ public class EndTxnRequestTest {
.setProducerId(producerId) .setProducerId(producerId)
.setTransactionalId(transactionId)); .setTransactionalId(transactionId));
for (short version = 0; version <= ApiKeys.END_TXN.latestVersion(); version++) { for (short version : ApiKeys.END_TXN.allVersions()) {
EndTxnRequest request = builder.build(version); EndTxnRequest request = builder.build(version);
EndTxnResponse response = request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception()); EndTxnResponse response = request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception());

View File

@ -38,7 +38,7 @@ public class EndTxnResponseTest {
Map<Errors, Integer> expectedErrorCounts = Collections.singletonMap(Errors.NOT_COORDINATOR, 1); Map<Errors, Integer> expectedErrorCounts = Collections.singletonMap(Errors.NOT_COORDINATOR, 1);
for (short version = 0; version <= ApiKeys.END_TXN.latestVersion(); version++) { for (short version : ApiKeys.END_TXN.allVersions()) {
EndTxnResponse response = new EndTxnResponse(data); EndTxnResponse response = new EndTxnResponse(data);
assertEquals(expectedErrorCounts, response.errorCounts()); assertEquals(expectedErrorCounts, response.errorCounts());
assertEquals(throttleTimeMs, response.throttleTimeMs()); assertEquals(throttleTimeMs, response.throttleTimeMs());

View File

@ -46,7 +46,7 @@ public class EnvelopeRequestTest {
@Test @Test
public void testToSend() throws IOException { public void testToSend() throws IOException {
for (short version = ApiKeys.ENVELOPE.oldestVersion(); version <= ApiKeys.ENVELOPE.latestVersion(); version++) { for (short version : ApiKeys.ENVELOPE.allVersions()) {
ByteBuffer requestData = ByteBuffer.wrap("foobar".getBytes()); ByteBuffer requestData = ByteBuffer.wrap("foobar".getBytes());
RequestHeader header = new RequestHeader(ApiKeys.ENVELOPE, version, "clientId", 15); RequestHeader header = new RequestHeader(ApiKeys.ENVELOPE, version, "clientId", 15);
EnvelopeRequest request = new EnvelopeRequest.Builder( EnvelopeRequest request = new EnvelopeRequest.Builder(

View File

@ -32,7 +32,7 @@ class EnvelopeResponseTest {
@Test @Test
public void testToSend() { public void testToSend() {
for (short version = ApiKeys.ENVELOPE.oldestVersion(); version <= ApiKeys.ENVELOPE.latestVersion(); version++) { for (short version : ApiKeys.ENVELOPE.allVersions()) {
ByteBuffer responseData = ByteBuffer.wrap("foobar".getBytes()); ByteBuffer responseData = ByteBuffer.wrap("foobar".getBytes());
EnvelopeResponse response = new EnvelopeResponse(responseData, Errors.NONE); EnvelopeResponse response = new EnvelopeResponse(responseData, Errors.NONE);
short headerVersion = ApiKeys.ENVELOPE.responseHeaderVersion(version); short headerVersion = ApiKeys.ENVELOPE.responseHeaderVersion(version);

View File

@ -64,8 +64,7 @@ public class LeaderAndIsrRequestTest {
Uuid topicId = Uuid.randomUuid(); Uuid topicId = Uuid.randomUuid();
String topicName = "topic"; String topicName = "topic";
int partition = 0; int partition = 0;
for (short version : LEADER_AND_ISR.allVersions()) {
for (short version = LEADER_AND_ISR.oldestVersion(); version <= LEADER_AND_ISR.latestVersion(); version++) {
LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(version, 0, 0, 0, LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(version, 0, 0, 0,
Collections.singletonList(new LeaderAndIsrPartitionState() Collections.singletonList(new LeaderAndIsrPartitionState()
.setTopicName(topicName) .setTopicName(topicName)
@ -108,7 +107,7 @@ public class LeaderAndIsrRequestTest {
*/ */
@Test @Test
public void testVersionLogic() { public void testVersionLogic() {
for (short version = LEADER_AND_ISR.oldestVersion(); version <= LEADER_AND_ISR.latestVersion(); version++) { for (short version : LEADER_AND_ISR.allVersions()) {
List<LeaderAndIsrPartitionState> partitionStates = asList( List<LeaderAndIsrPartitionState> partitionStates = asList(
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
.setTopicName("topic0") .setTopicName("topic0")

View File

@ -71,7 +71,7 @@ public class LeaderAndIsrResponseTest {
@Test @Test
public void testErrorCountsWithTopLevelError() { public void testErrorCountsWithTopLevelError() {
for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { for (short version : LEADER_AND_ISR.allVersions()) {
LeaderAndIsrResponse response; LeaderAndIsrResponse response;
if (version < 5) { if (version < 5) {
List<LeaderAndIsrPartitionError> partitions = createPartitions("foo", List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
@ -92,7 +92,7 @@ public class LeaderAndIsrResponseTest {
@Test @Test
public void testErrorCountsNoTopLevelError() { public void testErrorCountsNoTopLevelError() {
for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { for (short version : LEADER_AND_ISR.allVersions()) {
LeaderAndIsrResponse response; LeaderAndIsrResponse response;
if (version < 5) { if (version < 5) {
List<LeaderAndIsrPartitionError> partitions = createPartitions("foo", List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
@ -116,7 +116,7 @@ public class LeaderAndIsrResponseTest {
@Test @Test
public void testToString() { public void testToString() {
for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { for (short version : LEADER_AND_ISR.allVersions()) {
LeaderAndIsrResponse response; LeaderAndIsrResponse response;
if (version < 5) { if (version < 5) {
List<LeaderAndIsrPartitionError> partitions = createPartitions("foo", List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",

View File

@ -67,7 +67,7 @@ public class LeaveGroupRequestTest {
.setGroupId(groupId) .setGroupId(groupId)
.setMembers(members); .setMembers(members);
for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { for (short version : ApiKeys.LEAVE_GROUP.allVersions()) {
try { try {
LeaveGroupRequest request = builder.build(version); LeaveGroupRequest request = builder.build(version);
if (version <= 2) { if (version <= 2) {

View File

@ -67,7 +67,7 @@ public class LeaveGroupResponseTest {
expectedErrorCounts.put(Errors.UNKNOWN_MEMBER_ID, 1); expectedErrorCounts.put(Errors.UNKNOWN_MEMBER_ID, 1);
expectedErrorCounts.put(Errors.FENCED_INSTANCE_ID, 1); expectedErrorCounts.put(Errors.FENCED_INSTANCE_ID, 1);
for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { for (short version : ApiKeys.LEAVE_GROUP.allVersions()) {
LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(memberResponses, LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(memberResponses,
Errors.NONE, Errors.NONE,
throttleTimeMs, throttleTimeMs,
@ -95,7 +95,7 @@ public class LeaveGroupResponseTest {
@Test @Test
public void testShouldThrottle() { public void testShouldThrottle() {
LeaveGroupResponse response = new LeaveGroupResponse(new LeaveGroupResponseData()); LeaveGroupResponse response = new LeaveGroupResponse(new LeaveGroupResponseData());
for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { for (short version : ApiKeys.LEAVE_GROUP.allVersions()) {
if (version >= 2) { if (version >= 2) {
assertTrue(response.shouldClientThrottle(version)); assertTrue(response.shouldClientThrottle(version));
} else { } else {
@ -109,7 +109,7 @@ public class LeaveGroupResponseTest {
LeaveGroupResponseData responseData = new LeaveGroupResponseData() LeaveGroupResponseData responseData = new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code()) .setErrorCode(Errors.NONE.code())
.setThrottleTimeMs(throttleTimeMs); .setThrottleTimeMs(throttleTimeMs);
for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { for (short version : ApiKeys.LEAVE_GROUP.allVersions()) {
LeaveGroupResponse primaryResponse = LeaveGroupResponse.parse( LeaveGroupResponse primaryResponse = LeaveGroupResponse.parse(
MessageUtil.toByteBuffer(responseData, version), version); MessageUtil.toByteBuffer(responseData, version), version);
LeaveGroupResponse secondaryResponse = LeaveGroupResponse.parse( LeaveGroupResponse secondaryResponse = LeaveGroupResponse.parse(
@ -129,7 +129,7 @@ public class LeaveGroupResponseTest {
.setErrorCode(Errors.NOT_COORDINATOR.code()) .setErrorCode(Errors.NOT_COORDINATOR.code())
.setThrottleTimeMs(throttleTimeMs); .setThrottleTimeMs(throttleTimeMs);
for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { for (short version : ApiKeys.LEAVE_GROUP.allVersions()) {
ByteBuffer buffer = MessageUtil.toByteBuffer(data, version); ByteBuffer buffer = MessageUtil.toByteBuffer(data, version);
LeaveGroupResponse leaveGroupResponse = LeaveGroupResponse.parse(buffer, version); LeaveGroupResponse leaveGroupResponse = LeaveGroupResponse.parse(buffer, version);
assertEquals(expectedErrorCounts, leaveGroupResponse.errorCounts()); assertEquals(expectedErrorCounts, leaveGroupResponse.errorCounts());
@ -146,7 +146,7 @@ public class LeaveGroupResponseTest {
@Test @Test
public void testEqualityWithMemberResponses() { public void testEqualityWithMemberResponses() {
for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { for (short version : ApiKeys.LEAVE_GROUP.allVersions()) {
List<MemberResponse> localResponses = version > 2 ? memberResponses : memberResponses.subList(0, 1); List<MemberResponse> localResponses = version > 2 ? memberResponses : memberResponses.subList(0, 1);
LeaveGroupResponse primaryResponse = new LeaveGroupResponse(localResponses, LeaveGroupResponse primaryResponse = new LeaveGroupResponse(localResponses,
Errors.NONE, Errors.NONE,

View File

@ -91,7 +91,7 @@ public class OffsetCommitRequestTest {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data); OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data);
for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) { for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
OffsetCommitRequest request = builder.build(version); OffsetCommitRequest request = builder.build(version);
assertEquals(expectedOffsets, request.offsets()); assertEquals(expectedOffsets, request.offsets());
@ -130,7 +130,7 @@ public class OffsetCommitRequestTest {
.setGroupInstanceId(groupInstanceId) .setGroupInstanceId(groupInstanceId)
); );
for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
if (version >= 7) { if (version >= 7) {
builder.build(version); builder.build(version);
} else { } else {

View File

@ -85,7 +85,7 @@ public class OffsetCommitResponseTest {
)) ))
.setThrottleTimeMs(throttleTimeMs); .setThrottleTimeMs(throttleTimeMs);
for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
ByteBuffer buffer = MessageUtil.toByteBuffer(data, version); ByteBuffer buffer = MessageUtil.toByteBuffer(data, version);
OffsetCommitResponse response = OffsetCommitResponse.parse(buffer, version); OffsetCommitResponse response = OffsetCommitResponse.parse(buffer, version);
assertEquals(expectedErrorCounts, response.errorCounts()); assertEquals(expectedErrorCounts, response.errorCounts());

View File

@ -75,7 +75,7 @@ public class OffsetFetchRequestTest {
)); ));
} }
for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
OffsetFetchRequest request = builder.build(version); OffsetFetchRequest request = builder.build(version);
assertFalse(request.isAllPartitions()); assertFalse(request.isAllPartitions());
assertEquals(groupId, request.groupId()); assertEquals(groupId, request.groupId());
@ -101,7 +101,7 @@ public class OffsetFetchRequestTest {
@Test @Test
public void testConstructorFailForUnsupportedRequireStable() { public void testConstructorFailForUnsupportedRequireStable() {
for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
// The builder needs to be initialized every cycle as the internal data `requireStable` flag is flipped. // The builder needs to be initialized every cycle as the internal data `requireStable` flag is flipped.
builder = new OffsetFetchRequest.Builder(groupId, true, null, false); builder = new OffsetFetchRequest.Builder(groupId, true, null, false);
final short finalVersion = version; final short finalVersion = version;
@ -123,7 +123,7 @@ public class OffsetFetchRequestTest {
@Test @Test
public void testBuildThrowForUnsupportedRequireStable() { public void testBuildThrowForUnsupportedRequireStable() {
for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
builder = new OffsetFetchRequest.Builder(groupId, true, null, true); builder = new OffsetFetchRequest.Builder(groupId, true, null, true);
if (version < 7) { if (version < 7) {
final short finalVersion = version; final short finalVersion = version;

View File

@ -103,7 +103,7 @@ public class OffsetFetchResponseTest {
OffsetFetchResponse latestResponse = new OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap); OffsetFetchResponse latestResponse = new OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap);
for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
OffsetFetchResponseData data = new OffsetFetchResponseData( OffsetFetchResponseData data = new OffsetFetchResponseData(
new ByteBufferAccessor(latestResponse.serialize(version)), version); new ByteBufferAccessor(latestResponse.serialize(version)), version);
@ -154,7 +154,7 @@ public class OffsetFetchResponseTest {
@Test @Test
public void testShouldThrottle() { public void testShouldThrottle() {
OffsetFetchResponse response = new OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap); OffsetFetchResponse response = new OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap);
for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
if (version >= 4) { if (version >= 4) {
assertTrue(response.shouldClientThrottle(version)); assertTrue(response.shouldClientThrottle(version));
} else { } else {

View File

@ -34,15 +34,15 @@ public class OffsetsForLeaderEpochRequestTest {
assertThrows(UnsupportedVersionException.class, () -> builder.build(v)); assertThrows(UnsupportedVersionException.class, () -> builder.build(v));
} }
for (short version = 3; version < ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(); version++) { for (short version = 3; version <= ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(); version++) {
OffsetsForLeaderEpochRequest request = builder.build((short) 3); OffsetsForLeaderEpochRequest request = builder.build(version);
assertEquals(OffsetsForLeaderEpochRequest.CONSUMER_REPLICA_ID, request.replicaId()); assertEquals(OffsetsForLeaderEpochRequest.CONSUMER_REPLICA_ID, request.replicaId());
} }
} }
@Test @Test
public void testDefaultReplicaId() { public void testDefaultReplicaId() {
for (short version = 0; version < ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(); version++) { for (short version : ApiKeys.OFFSET_FOR_LEADER_EPOCH.allVersions()) {
int replicaId = 1; int replicaId = 1;
OffsetsForLeaderEpochRequest.Builder builder = OffsetsForLeaderEpochRequest.Builder.forFollower( OffsetsForLeaderEpochRequest.Builder builder = OffsetsForLeaderEpochRequest.Builder.forFollower(
version, new OffsetForLeaderTopicCollection(), replicaId); version, new OffsetForLeaderTopicCollection(), replicaId);

View File

@ -18,6 +18,7 @@
package org.apache.kafka.common.requests; package org.apache.kafka.common.requests;
import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
@ -32,11 +33,12 @@ import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ProduceRequestTest { public class ProduceRequestTest {
@ -151,7 +153,7 @@ public class ProduceRequestTest {
.setRecords(MemoryRecords.readableRecords(buffer))))).iterator())) .setRecords(MemoryRecords.readableRecords(buffer))))).iterator()))
.setAcks((short) 1) .setAcks((short) 1)
.setTimeoutMs(5000)); .setTimeoutMs(5000));
assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); assertThrowsForAllVersions(requestBuilder, InvalidRecordException.class);
} }
@Test @Test
@ -166,7 +168,7 @@ public class ProduceRequestTest {
.setRecords(MemoryRecords.EMPTY)))).iterator())) .setRecords(MemoryRecords.EMPTY)))).iterator()))
.setAcks((short) 1) .setAcks((short) 1)
.setTimeoutMs(5000)); .setTimeoutMs(5000));
assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); assertThrowsForAllVersions(requestBuilder, InvalidRecordException.class);
} }
@Test @Test
@ -186,7 +188,7 @@ public class ProduceRequestTest {
.setRecords(builder.build())))).iterator())) .setRecords(builder.build())))).iterator()))
.setAcks((short) 1) .setAcks((short) 1)
.setTimeoutMs(5000)); .setTimeoutMs(5000));
assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); assertThrowsForAllVersions(requestBuilder, InvalidRecordException.class);
} }
@Test @Test
@ -206,7 +208,7 @@ public class ProduceRequestTest {
.iterator())) .iterator()))
.setAcks((short) 1) .setAcks((short) 1)
.setTimeoutMs(5000)); .setTimeoutMs(5000));
assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); assertThrowsForAllVersions(requestBuilder, InvalidRecordException.class);
} }
@Test @Test
@ -230,7 +232,7 @@ public class ProduceRequestTest {
for (short version = 3; version < 7; version++) { for (short version = 3; version < 7; version++) {
ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, produceData); ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, produceData);
assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); assertThrowsForAllVersions(requestBuilder, UnsupportedCompressionTypeException.class);
} }
// Works fine with current version (>= 7) // Works fine with current version (>= 7)
@ -291,20 +293,10 @@ public class ProduceRequestTest {
assertTrue(RequestTestUtils.hasIdempotentRecords(request)); assertTrue(RequestTestUtils.hasIdempotentRecords(request));
} }
private void assertThrowsInvalidRecordExceptionForAllVersions(ProduceRequest.Builder builder) { private static <T extends Throwable> void assertThrowsForAllVersions(ProduceRequest.Builder builder,
for (short version = builder.oldestAllowedVersion(); version < builder.latestAllowedVersion(); version++) { Class<T> expectedType) {
assertThrowsInvalidRecordException(builder, version); IntStream.range(builder.oldestAllowedVersion(), builder.latestAllowedVersion() + 1)
} .forEach(version -> assertThrows(expectedType, () -> builder.build((short) version).serialize()));
}
private void assertThrowsInvalidRecordException(ProduceRequest.Builder builder, short version) {
try {
builder.build(version).serialize();
fail("Builder did not raise " + InvalidRecordException.class.getName() + " as expected");
} catch (RuntimeException e) {
assertTrue(InvalidRecordException.class.isAssignableFrom(e.getClass()),
"Unexpected exception type " + e.getClass().getName());
}
} }
private ProduceRequest createNonIdempotentNonTransactionalRecords() { private ProduceRequest createNonIdempotentNonTransactionalRecords() {

View File

@ -89,10 +89,10 @@ public class ProduceResponseTest {
"Produce failed"); "Produce failed");
responseData.put(tp, partResponse); responseData.put(tp, partResponse);
for (short ver = 0; ver <= PRODUCE.latestVersion(); ver++) { for (short version : PRODUCE.allVersions()) {
ProduceResponse response = new ProduceResponse(responseData); ProduceResponse response = new ProduceResponse(responseData);
ProduceResponse.PartitionResponse deserialized = ProduceResponse.parse(response.serialize(ver), ver).responses().get(tp); ProduceResponse.PartitionResponse deserialized = ProduceResponse.parse(response.serialize(version), version).responses().get(tp);
if (ver >= 8) { if (version >= 8) {
assertEquals(1, deserialized.recordErrors.size()); assertEquals(1, deserialized.recordErrors.size());
assertEquals(3, deserialized.recordErrors.get(0).batchIndex); assertEquals(3, deserialized.recordErrors.get(0).batchIndex);
assertEquals("Record error", deserialized.recordErrors.get(0).message); assertEquals("Record error", deserialized.recordErrors.get(0).message);

View File

@ -221,11 +221,16 @@ import java.util.Set;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static org.apache.kafka.common.protocol.ApiKeys.CREATE_PARTITIONS;
import static org.apache.kafka.common.protocol.ApiKeys.CREATE_TOPICS;
import static org.apache.kafka.common.protocol.ApiKeys.DELETE_TOPICS;
import static org.apache.kafka.common.protocol.ApiKeys.DESCRIBE_CONFIGS; import static org.apache.kafka.common.protocol.ApiKeys.DESCRIBE_CONFIGS;
import static org.apache.kafka.common.protocol.ApiKeys.FETCH; import static org.apache.kafka.common.protocol.ApiKeys.FETCH;
import static org.apache.kafka.common.protocol.ApiKeys.JOIN_GROUP; import static org.apache.kafka.common.protocol.ApiKeys.JOIN_GROUP;
import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR;
import static org.apache.kafka.common.protocol.ApiKeys.LIST_GROUPS; import static org.apache.kafka.common.protocol.ApiKeys.LIST_GROUPS;
import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS; import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS;
import static org.apache.kafka.common.protocol.ApiKeys.STOP_REPLICA;
import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP; import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -267,26 +272,26 @@ public class RequestResponseTest {
checkErrorResponse(createHeartBeatRequest(), unknownServerException, true); checkErrorResponse(createHeartBeatRequest(), unknownServerException, true);
checkResponse(createHeartBeatResponse(), 0, true); checkResponse(createHeartBeatResponse(), 0, true);
for (int v = ApiKeys.JOIN_GROUP.oldestVersion(); v <= ApiKeys.JOIN_GROUP.latestVersion(); v++) { for (short version : JOIN_GROUP.allVersions()) {
checkRequest(createJoinGroupRequest(v), true); checkRequest(createJoinGroupRequest(version), true);
checkErrorResponse(createJoinGroupRequest(v), unknownServerException, true); checkErrorResponse(createJoinGroupRequest(version), unknownServerException, true);
checkResponse(createJoinGroupResponse(v), v, true); checkResponse(createJoinGroupResponse(version), version, true);
} }
for (int v = ApiKeys.SYNC_GROUP.oldestVersion(); v <= ApiKeys.SYNC_GROUP.latestVersion(); v++) { for (short version : SYNC_GROUP.allVersions()) {
checkRequest(createSyncGroupRequest(v), true); checkRequest(createSyncGroupRequest(version), true);
checkErrorResponse(createSyncGroupRequest(v), unknownServerException, true); checkErrorResponse(createSyncGroupRequest(version), unknownServerException, true);
checkResponse(createSyncGroupResponse(v), v, true); checkResponse(createSyncGroupResponse(version), version, true);
} }
checkRequest(createLeaveGroupRequest(), true); checkRequest(createLeaveGroupRequest(), true);
checkErrorResponse(createLeaveGroupRequest(), unknownServerException, true); checkErrorResponse(createLeaveGroupRequest(), unknownServerException, true);
checkResponse(createLeaveGroupResponse(), 0, true); checkResponse(createLeaveGroupResponse(), 0, true);
for (short v = ApiKeys.LIST_GROUPS.oldestVersion(); v <= ApiKeys.LIST_GROUPS.latestVersion(); v++) { for (short version : ApiKeys.LIST_GROUPS.allVersions()) {
checkRequest(createListGroupsRequest(v), false); checkRequest(createListGroupsRequest(version), false);
checkErrorResponse(createListGroupsRequest(v), unknownServerException, true); checkErrorResponse(createListGroupsRequest(version), unknownServerException, true);
checkResponse(createListGroupsResponse(v), v, true); checkResponse(createListGroupsResponse(version), version, true);
} }
checkRequest(createDescribeGroupRequest(), true); checkRequest(createDescribeGroupRequest(), true);
@ -295,10 +300,10 @@ public class RequestResponseTest {
checkRequest(createDeleteGroupsRequest(), true); checkRequest(createDeleteGroupsRequest(), true);
checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true); checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true);
checkResponse(createDeleteGroupsResponse(), 0, true); checkResponse(createDeleteGroupsResponse(), 0, true);
for (int i = 0; i < ApiKeys.LIST_OFFSETS.latestVersion(); i++) { for (short version : LIST_OFFSETS.allVersions()) {
checkRequest(createListOffsetRequest(i), true); checkRequest(createListOffsetRequest(version), true);
checkErrorResponse(createListOffsetRequest(i), unknownServerException, true); checkErrorResponse(createListOffsetRequest(version), unknownServerException, true);
checkResponse(createListOffsetResponse(i), i, true); checkResponse(createListOffsetResponse(version), version, true);
} }
checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true); checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true);
checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true); checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true);
@ -331,18 +336,18 @@ public class RequestResponseTest {
checkResponse(createProduceResponse(), 2, true); checkResponse(createProduceResponse(), 2, true);
checkResponse(createProduceResponseWithErrorMessage(), 8, true); checkResponse(createProduceResponseWithErrorMessage(), 8, true);
for (int v = ApiKeys.STOP_REPLICA.oldestVersion(); v <= ApiKeys.STOP_REPLICA.latestVersion(); v++) { for (short version : STOP_REPLICA.allVersions()) {
checkRequest(createStopReplicaRequest(v, true), true); checkRequest(createStopReplicaRequest(version, true), true);
checkRequest(createStopReplicaRequest(v, false), true); checkRequest(createStopReplicaRequest(version, false), true);
checkErrorResponse(createStopReplicaRequest(v, true), unknownServerException, true); checkErrorResponse(createStopReplicaRequest(version, true), unknownServerException, true);
checkErrorResponse(createStopReplicaRequest(v, false), unknownServerException, true); checkErrorResponse(createStopReplicaRequest(version, false), unknownServerException, true);
checkResponse(createStopReplicaResponse(), v, true); checkResponse(createStopReplicaResponse(), version, true);
} }
for (int v = ApiKeys.LEADER_AND_ISR.oldestVersion(); v <= ApiKeys.LEADER_AND_ISR.latestVersion(); v++) { for (short version : LEADER_AND_ISR.allVersions()) {
checkRequest(createLeaderAndIsrRequest(v), true); checkRequest(createLeaderAndIsrRequest(version), true);
checkErrorResponse(createLeaderAndIsrRequest(v), unknownServerException, false); checkErrorResponse(createLeaderAndIsrRequest(version), unknownServerException, false);
checkResponse(createLeaderAndIsrResponse(v), v, true); checkResponse(createLeaderAndIsrResponse(version), version, true);
} }
checkRequest(createSaslHandshakeRequest(), true); checkRequest(createSaslHandshakeRequest(), true);
@ -353,23 +358,23 @@ public class RequestResponseTest {
checkResponse(createSaslAuthenticateResponse(), 0, true); checkResponse(createSaslAuthenticateResponse(), 0, true);
checkResponse(createSaslAuthenticateResponse(), 1, true); checkResponse(createSaslAuthenticateResponse(), 1, true);
for (int v = ApiKeys.CREATE_TOPICS.oldestVersion(); v <= ApiKeys.CREATE_TOPICS.latestVersion(); v++) { for (short version : CREATE_TOPICS.allVersions()) {
checkRequest(createCreateTopicRequest(v), true); checkRequest(createCreateTopicRequest(version), true);
checkErrorResponse(createCreateTopicRequest(v), unknownServerException, true); checkErrorResponse(createCreateTopicRequest(version), unknownServerException, true);
checkResponse(createCreateTopicResponse(), v, true); checkResponse(createCreateTopicResponse(), version, true);
} }
for (int v = ApiKeys.DELETE_TOPICS.oldestVersion(); v <= ApiKeys.DELETE_TOPICS.latestVersion(); v++) { for (short version : DELETE_TOPICS.allVersions()) {
checkRequest(createDeleteTopicsRequest(v), true); checkRequest(createDeleteTopicsRequest(version), true);
checkErrorResponse(createDeleteTopicsRequest(v), unknownServerException, true); checkErrorResponse(createDeleteTopicsRequest(version), unknownServerException, true);
checkResponse(createDeleteTopicsResponse(), v, true); checkResponse(createDeleteTopicsResponse(), version, true);
} }
for (int v = ApiKeys.CREATE_PARTITIONS.oldestVersion(); v <= ApiKeys.CREATE_PARTITIONS.latestVersion(); v++) { for (short version : CREATE_PARTITIONS.allVersions()) {
checkRequest(createCreatePartitionsRequest(v), true); checkRequest(createCreatePartitionsRequest(version), true);
checkRequest(createCreatePartitionsRequestWithAssignments(v), false); checkRequest(createCreatePartitionsRequestWithAssignments(version), false);
checkErrorResponse(createCreatePartitionsRequest(v), unknownServerException, true); checkErrorResponse(createCreatePartitionsRequest(version), unknownServerException, true);
checkResponse(createCreatePartitionsResponse(), v, true); checkResponse(createCreatePartitionsResponse(), version, true);
} }
checkRequest(createInitPidRequest(), true); checkRequest(createInitPidRequest(), true);
@ -518,75 +523,75 @@ public class RequestResponseTest {
@Test @Test
public void testApiVersionsSerialization() { public void testApiVersionsSerialization() {
for (short v : ApiKeys.API_VERSIONS.allVersions()) { for (short version : ApiKeys.API_VERSIONS.allVersions()) {
checkRequest(createApiVersionRequest(v), true); checkRequest(createApiVersionRequest(version), true);
checkErrorResponse(createApiVersionRequest(v), unknownServerException, true); checkErrorResponse(createApiVersionRequest(version), unknownServerException, true);
checkErrorResponse(createApiVersionRequest(v), new UnsupportedVersionException("Not Supported"), true); checkErrorResponse(createApiVersionRequest(version), new UnsupportedVersionException("Not Supported"), true);
checkResponse(createApiVersionResponse(), v, true); checkResponse(createApiVersionResponse(), version, true);
checkResponse(ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER), v, true); checkResponse(ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER), version, true);
} }
} }
@Test @Test
public void testBrokerHeartbeatSerialization() { public void testBrokerHeartbeatSerialization() {
for (short v : ApiKeys.BROKER_HEARTBEAT.allVersions()) { for (short version : ApiKeys.BROKER_HEARTBEAT.allVersions()) {
checkRequest(createBrokerHeartbeatRequest(v), true); checkRequest(createBrokerHeartbeatRequest(version), true);
checkErrorResponse(createBrokerHeartbeatRequest(v), unknownServerException, true); checkErrorResponse(createBrokerHeartbeatRequest(version), unknownServerException, true);
checkResponse(createBrokerHeartbeatResponse(), v, true); checkResponse(createBrokerHeartbeatResponse(), version, true);
} }
} }
@Test @Test
public void testBrokerRegistrationSerialization() { public void testBrokerRegistrationSerialization() {
for (short v : ApiKeys.BROKER_REGISTRATION.allVersions()) { for (short version : ApiKeys.BROKER_REGISTRATION.allVersions()) {
checkRequest(createBrokerRegistrationRequest(v), true); checkRequest(createBrokerRegistrationRequest(version), true);
checkErrorResponse(createBrokerRegistrationRequest(v), unknownServerException, true); checkErrorResponse(createBrokerRegistrationRequest(version), unknownServerException, true);
checkResponse(createBrokerRegistrationResponse(), 0, true); checkResponse(createBrokerRegistrationResponse(), 0, true);
} }
} }
@Test @Test
public void testDescribeProducersSerialization() { public void testDescribeProducersSerialization() {
for (short v : ApiKeys.DESCRIBE_PRODUCERS.allVersions()) { for (short version : ApiKeys.DESCRIBE_PRODUCERS.allVersions()) {
checkRequest(createDescribeProducersRequest(v), true); checkRequest(createDescribeProducersRequest(version), true);
checkErrorResponse(createDescribeProducersRequest(v), unknownServerException, true); checkErrorResponse(createDescribeProducersRequest(version), unknownServerException, true);
checkResponse(createDescribeProducersResponse(), v, true); checkResponse(createDescribeProducersResponse(), version, true);
} }
} }
@Test @Test
public void testDescribeTransactionsSerialization() { public void testDescribeTransactionsSerialization() {
for (short v : ApiKeys.DESCRIBE_TRANSACTIONS.allVersions()) { for (short version : ApiKeys.DESCRIBE_TRANSACTIONS.allVersions()) {
checkRequest(createDescribeTransactionsRequest(v), true); checkRequest(createDescribeTransactionsRequest(version), true);
checkErrorResponse(createDescribeTransactionsRequest(v), unknownServerException, true); checkErrorResponse(createDescribeTransactionsRequest(version), unknownServerException, true);
checkResponse(createDescribeTransactionsResponse(), v, true); checkResponse(createDescribeTransactionsResponse(), version, true);
} }
} }
@Test @Test
public void testListTransactionsSerialization() { public void testListTransactionsSerialization() {
for (short v : ApiKeys.LIST_TRANSACTIONS.allVersions()) { for (short version : ApiKeys.LIST_TRANSACTIONS.allVersions()) {
checkRequest(createListTransactionsRequest(v), true); checkRequest(createListTransactionsRequest(version), true);
checkErrorResponse(createListTransactionsRequest(v), unknownServerException, true); checkErrorResponse(createListTransactionsRequest(version), unknownServerException, true);
checkResponse(createListTransactionsResponse(), v, true); checkResponse(createListTransactionsResponse(), version, true);
} }
} }
@Test @Test
public void testDescribeClusterSerialization() { public void testDescribeClusterSerialization() {
for (short v : ApiKeys.DESCRIBE_CLUSTER.allVersions()) { for (short version : ApiKeys.DESCRIBE_CLUSTER.allVersions()) {
checkRequest(createDescribeClusterRequest(v), true); checkRequest(createDescribeClusterRequest(version), true);
checkErrorResponse(createDescribeClusterRequest(v), unknownServerException, true); checkErrorResponse(createDescribeClusterRequest(version), unknownServerException, true);
checkResponse(createDescribeClusterResponse(), v, true); checkResponse(createDescribeClusterResponse(), version, true);
} }
} }
@Test @Test
public void testUnregisterBrokerSerialization() { public void testUnregisterBrokerSerialization() {
for (short v : ApiKeys.UNREGISTER_BROKER.allVersions()) { for (short version : ApiKeys.UNREGISTER_BROKER.allVersions()) {
checkRequest(createUnregisterBrokerRequest(v), true); checkRequest(createUnregisterBrokerRequest(version), true);
checkErrorResponse(createUnregisterBrokerRequest(v), unknownServerException, true); checkErrorResponse(createUnregisterBrokerRequest(version), unknownServerException, true);
checkResponse(createUnregisterBrokerResponse(), v, true); checkResponse(createUnregisterBrokerResponse(), version, true);
} }
} }
@ -623,13 +628,12 @@ public class RequestResponseTest {
} }
private void checkOlderFetchVersions() { private void checkOlderFetchVersions() {
int latestVersion = FETCH.latestVersion(); for (short version : FETCH.allVersions()) {
for (int i = 0; i < latestVersion; ++i) { if (version > 7) {
if (i > 7) { checkErrorResponse(createFetchRequest(version), unknownServerException, true);
checkErrorResponse(createFetchRequest(i), unknownServerException, true);
} }
checkRequest(createFetchRequest(i), true); checkRequest(createFetchRequest(version), true);
checkResponse(createFetchResponse(i >= 4), i, true); checkResponse(createFetchResponse(version >= 4), version, true);
} }
} }
@ -668,12 +672,11 @@ public class RequestResponseTest {
} }
private void checkDescribeConfigsResponseVersions() { private void checkDescribeConfigsResponseVersions() {
for (int version = ApiKeys.DESCRIBE_CONFIGS.oldestVersion(); version < ApiKeys.DESCRIBE_CONFIGS.latestVersion(); ++version) { for (short version : ApiKeys.DESCRIBE_CONFIGS.allVersions()) {
short apiVersion = (short) version; DescribeConfigsResponse response = createDescribeConfigsResponse(version);
DescribeConfigsResponse response = createDescribeConfigsResponse(apiVersion);
DescribeConfigsResponse deserialized0 = (DescribeConfigsResponse) AbstractResponse.parseResponse(ApiKeys.DESCRIBE_CONFIGS, DescribeConfigsResponse deserialized0 = (DescribeConfigsResponse) AbstractResponse.parseResponse(ApiKeys.DESCRIBE_CONFIGS,
response.serialize(apiVersion), apiVersion); response.serialize(version), version);
verifyDescribeConfigsResponse(response, deserialized0, apiVersion); verifyDescribeConfigsResponse(response, deserialized0, version);
} }
} }
@ -859,7 +862,7 @@ public class RequestResponseTest {
verifyFetchResponseFullWrite(FETCH.latestVersion(), createFetchResponse(123)); verifyFetchResponseFullWrite(FETCH.latestVersion(), createFetchResponse(123));
verifyFetchResponseFullWrite(FETCH.latestVersion(), verifyFetchResponseFullWrite(FETCH.latestVersion(),
createFetchResponse(Errors.FETCH_SESSION_ID_NOT_FOUND, 123)); createFetchResponse(Errors.FETCH_SESSION_ID_NOT_FOUND, 123));
for (short version = 0; version <= FETCH.latestVersion(); version++) { for (short version : FETCH.allVersions()) {
verifyFetchResponseFullWrite(version, createFetchResponse(version >= 4)); verifyFetchResponseFullWrite(version, createFetchResponse(version >= 4));
} }
} }

View File

@ -66,7 +66,7 @@ public class StopReplicaRequestTest {
} }
} }
for (short version = STOP_REPLICA.oldestVersion(); version <= STOP_REPLICA.latestVersion(); version++) { for (short version : STOP_REPLICA.allVersions()) {
StopReplicaRequest.Builder builder = new StopReplicaRequest.Builder(version, StopReplicaRequest.Builder builder = new StopReplicaRequest.Builder(version,
0, 0, 0L, false, topicStates); 0, 0, 0L, false, topicStates);
StopReplicaRequest request = builder.build(); StopReplicaRequest request = builder.build();
@ -93,7 +93,7 @@ public class StopReplicaRequestTest {
Map<TopicPartition, StopReplicaPartitionState> expectedPartitionStates = Map<TopicPartition, StopReplicaPartitionState> expectedPartitionStates =
StopReplicaRequestTest.partitionStates(topicStates); StopReplicaRequestTest.partitionStates(topicStates);
for (short version = STOP_REPLICA.oldestVersion(); version <= STOP_REPLICA.latestVersion(); version++) { for (short version : STOP_REPLICA.allVersions()) {
StopReplicaRequest request = new StopReplicaRequest.Builder(version, 0, 1, 0, StopReplicaRequest request = new StopReplicaRequest.Builder(version, 0, 1, 0,
deletePartitions, topicStates).build(version); deletePartitions, topicStates).build(version);
StopReplicaRequestData data = request.data(); StopReplicaRequestData data = request.data();
@ -128,7 +128,7 @@ public class StopReplicaRequestTest {
public void testTopicStatesNormalization() { public void testTopicStatesNormalization() {
List<StopReplicaTopicState> topicStates = topicStates(true); List<StopReplicaTopicState> topicStates = topicStates(true);
for (short version = STOP_REPLICA.oldestVersion(); version <= STOP_REPLICA.latestVersion(); version++) { for (short version : STOP_REPLICA.allVersions()) {
// Create a request for version to get its serialized form // Create a request for version to get its serialized form
StopReplicaRequest baseRequest = new StopReplicaRequest.Builder(version, 0, 1, 0, StopReplicaRequest baseRequest = new StopReplicaRequest.Builder(version, 0, 1, 0,
true, topicStates).build(version); true, topicStates).build(version);
@ -163,7 +163,7 @@ public class StopReplicaRequestTest {
public void testPartitionStatesNormalization() { public void testPartitionStatesNormalization() {
List<StopReplicaTopicState> topicStates = topicStates(true); List<StopReplicaTopicState> topicStates = topicStates(true);
for (short version = STOP_REPLICA.oldestVersion(); version <= STOP_REPLICA.latestVersion(); version++) { for (short version : STOP_REPLICA.allVersions()) {
// Create a request for version to get its serialized form // Create a request for version to get its serialized form
StopReplicaRequest baseRequest = new StopReplicaRequest.Builder(version, 0, 1, 0, StopReplicaRequest baseRequest = new StopReplicaRequest.Builder(version, 0, 1, 0,
true, topicStates).build(version); true, topicStates).build(version);

View File

@ -44,7 +44,7 @@ public class StopReplicaResponseTest {
new StopReplicaPartitionState().setPartitionIndex(0), new StopReplicaPartitionState().setPartitionIndex(0),
new StopReplicaPartitionState().setPartitionIndex(1)))); new StopReplicaPartitionState().setPartitionIndex(1))));
for (short version = STOP_REPLICA.oldestVersion(); version <= STOP_REPLICA.latestVersion(); version++) { for (short version : STOP_REPLICA.allVersions()) {
StopReplicaRequest request = new StopReplicaRequest.Builder(version, StopReplicaRequest request = new StopReplicaRequest.Builder(version,
15, 20, 0, false, topicStates).build(version); 15, 20, 0, false, topicStates).build(version);
StopReplicaResponse response = request StopReplicaResponse response = request

View File

@ -116,7 +116,7 @@ public class TxnOffsetCommitRequestTest extends OffsetCommitRequestTest {
)) ))
); );
for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) { for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
final TxnOffsetCommitRequest request; final TxnOffsetCommitRequest request;
if (version < 3) { if (version < 3) {
request = builder.build(version); request = builder.build(version);

View File

@ -54,7 +54,7 @@ public class TxnOffsetCommitResponseTest extends OffsetCommitResponseTest {
.setErrorCode(errorTwo.code()))) .setErrorCode(errorTwo.code())))
)); ));
for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) { for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) {
TxnOffsetCommitResponse response = TxnOffsetCommitResponse.parse( TxnOffsetCommitResponse response = TxnOffsetCommitResponse.parse(
MessageUtil.toByteBuffer(data, version), version); MessageUtil.toByteBuffer(data, version), version);
assertEquals(expectedErrorCounts, response.errorCounts()); assertEquals(expectedErrorCounts, response.errorCounts());

View File

@ -61,7 +61,7 @@ public class UpdateMetadataRequestTest {
@Test @Test
public void testGetErrorResponse() { public void testGetErrorResponse() {
for (short version = UPDATE_METADATA.oldestVersion(); version < UPDATE_METADATA.latestVersion(); version++) { for (short version : UPDATE_METADATA.allVersions()) {
UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder( UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder(
version, 0, 0, 0, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap()); version, 0, 0, 0, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
UpdateMetadataRequest request = builder.build(); UpdateMetadataRequest request = builder.build();
@ -81,7 +81,7 @@ public class UpdateMetadataRequestTest {
public void testVersionLogic() { public void testVersionLogic() {
String topic0 = "topic0"; String topic0 = "topic0";
String topic1 = "topic1"; String topic1 = "topic1";
for (short version = UPDATE_METADATA.oldestVersion(); version <= UPDATE_METADATA.latestVersion(); version++) { for (short version : UPDATE_METADATA.allVersions()) {
List<UpdateMetadataPartitionState> partitionStates = asList( List<UpdateMetadataPartitionState> partitionStates = asList(
new UpdateMetadataPartitionState() new UpdateMetadataPartitionState()
.setTopicName(topic0) .setTopicName(topic0)

View File

@ -51,7 +51,7 @@ public class WriteTxnMarkersRequestTest {
@Test @Test
public void testConstructor() { public void testConstructor() {
WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), markers); WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), markers);
for (short version = 0; version <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(); version++) { for (short version : ApiKeys.WRITE_TXN_MARKERS.allVersions()) {
WriteTxnMarkersRequest request = builder.build(version); WriteTxnMarkersRequest request = builder.build(version);
assertEquals(1, request.markers().size()); assertEquals(1, request.markers().size());
WriteTxnMarkersRequest.TxnMarkerEntry marker = request.markers().get(0); WriteTxnMarkersRequest.TxnMarkerEntry marker = request.markers().get(0);
@ -66,7 +66,7 @@ public class WriteTxnMarkersRequestTest {
@Test @Test
public void testGetErrorResponse() { public void testGetErrorResponse() {
WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), markers); WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), markers);
for (short version = 0; version <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(); version++) { for (short version : ApiKeys.WRITE_TXN_MARKERS.allVersions()) {
WriteTxnMarkersRequest request = builder.build(version); WriteTxnMarkersRequest request = builder.build(version);
WriteTxnMarkersResponse errorResponse = WriteTxnMarkersResponse errorResponse =
request.getErrorResponse(throttleTimeMs, Errors.UNKNOWN_PRODUCER_ID.exception()); request.getErrorResponse(throttleTimeMs, Errors.UNKNOWN_PRODUCER_ID.exception());