diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 3297dc5734a..8ec2d02ea3d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -201,7 +201,7 @@ public enum ApiKeys { public List allVersions() { List versions = new ArrayList<>(latestVersion() - oldestVersion() + 1); - for (short version = oldestVersion(); version < latestVersion(); version++) { + for (short version = oldestVersion(); version <= latestVersion(); version++) { versions.add(version); } return versions; diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 5dc379e0e7f..e191ad6526a 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -186,7 +186,7 @@ public final class MessageTest { .setPartitions(Collections.singletonList(partition))); Supplier response = () -> new ListOffsetsResponseData() .setTopics(topics); - for (short version = 0; version <= ApiKeys.LIST_OFFSETS.latestVersion(); version++) { + for (short version : ApiKeys.LIST_OFFSETS.allVersions()) { ListOffsetsResponseData responseData = response.get(); if (version > 0) { responseData.topics().get(0).partitions().get(0) @@ -459,7 +459,7 @@ public final class MessageTest { )))) .setRetentionTimeMs(20); - for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { OffsetCommitRequestData requestData = request.get(); if (version < 1) { requestData.setMemberId(""); @@ -485,7 +485,7 @@ public final class MessageTest { if (version == 1) { testEquivalentMessageRoundTrip(version, requestData); } else if (version >= 2 && version <= 4) { - testAllMessageRoundTripsBetweenVersions(version, (short) 4, requestData, requestData); + testAllMessageRoundTripsBetweenVersions(version, (short) 5, requestData, requestData); } else { testAllMessageRoundTripsFromVersion(version, requestData); } @@ -509,7 +509,7 @@ public final class MessageTest { ) .setThrottleTimeMs(20); - for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { OffsetCommitResponseData responseData = response.get(); if (version < 3) { responseData.setThrottleTimeMs(0); @@ -568,7 +568,7 @@ public final class MessageTest { .setCommittedOffset(offset) )))); - for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) { + for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) { TxnOffsetCommitRequestData requestData = request.get(); if (version < 2) { requestData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1); @@ -632,7 +632,7 @@ public final class MessageTest { .setTopics(topics) .setRequireStable(true); - for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { final short finalVersion = version; if (version < 2) { assertThrows(NullPointerException.class, () -> testAllMessageRoundTripsFromVersion(finalVersion, allPartitionData)); @@ -661,7 +661,7 @@ public final class MessageTest { .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()))))) .setErrorCode(Errors.NOT_COORDINATOR.code()) .setThrottleTimeMs(10); - for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { OffsetFetchResponseData responseData = response.get(); if (version <= 1) { responseData.setErrorCode(Errors.NONE.code()); @@ -720,7 +720,7 @@ public final class MessageTest { .setErrorMessage(errorMessage)))).iterator())) .setThrottleTimeMs(throttleTimeMs); - for (short version = 0; version <= ApiKeys.PRODUCE.latestVersion(); version++) { + for (short version : ApiKeys.PRODUCE.allVersions()) { ProduceResponseData responseData = response.get(); if (version < 8) { @@ -741,9 +741,9 @@ public final class MessageTest { } if (version >= 3 && version <= 4) { - testAllMessageRoundTripsBetweenVersions(version, (short) 4, responseData, responseData); + testAllMessageRoundTripsBetweenVersions(version, (short) 5, responseData, responseData); } else if (version >= 6 && version <= 7) { - testAllMessageRoundTripsBetweenVersions(version, (short) 7, responseData, responseData); + testAllMessageRoundTripsBetweenVersions(version, (short) 8, responseData, responseData); } else { testEquivalentMessageRoundTrip(version, responseData); } @@ -924,8 +924,8 @@ public final class MessageTest { @Test public void testWriteNullForNonNullableFieldRaisesException() { CreateTopicsRequestData createTopics = new CreateTopicsRequestData().setTopics(null); - for (short i = (short) 0; i <= createTopics.highestSupportedVersion(); i++) { - verifyWriteRaisesNpe(i, createTopics); + for (short version : ApiKeys.CREATE_TOPICS.allVersions()) { + verifyWriteRaisesNpe(version, createTopics); } MetadataRequestData metadata = new MetadataRequestData().setTopics(null); verifyWriteRaisesNpe((short) 0, metadata); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequestTest.java index 24cb9a5d702..acba8be966a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequestTest.java @@ -43,7 +43,7 @@ public class AddPartitionsToTxnRequestTest { AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId, producerId, producerEpoch, partitions); - for (short version = 0; version <= ApiKeys.ADD_PARTITIONS_TO_TXN.latestVersion(); version++) { + for (short version : ApiKeys.ADD_PARTITIONS_TO_TXN.allVersions()) { AddPartitionsToTxnRequest request = builder.build(version); assertEquals(transactionalId, request.data().transactionalId()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponseTest.java index b7901880bf4..5b67bd47a01 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponseTest.java @@ -89,7 +89,7 @@ public class AddPartitionsToTxnResponseTest { .setThrottleTimeMs(throttleTimeMs); AddPartitionsToTxnResponse response = new AddPartitionsToTxnResponse(data); - for (short version = 0; version <= ApiKeys.ADD_PARTITIONS_TO_TXN.latestVersion(); version++) { + for (short version : ApiKeys.ADD_PARTITIONS_TO_TXN.allVersions()) { AddPartitionsToTxnResponse parsedResponse = AddPartitionsToTxnResponse.parse(response.serialize(version), version); assertEquals(expectedErrorCounts, parsedResponse.errorCounts()); assertEquals(throttleTimeMs, parsedResponse.throttleTimeMs()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ControlledShutdownRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ControlledShutdownRequestTest.java index 04f294c7182..867be713ba2 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ControlledShutdownRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ControlledShutdownRequestTest.java @@ -38,7 +38,7 @@ public class ControlledShutdownRequestTest { @Test public void testGetErrorResponse() { - for (short version = CONTROLLED_SHUTDOWN.oldestVersion(); version < CONTROLLED_SHUTDOWN.latestVersion(); version++) { + for (short version : CONTROLLED_SHUTDOWN.allVersions()) { ControlledShutdownRequest.Builder builder = new ControlledShutdownRequest.Builder( new ControlledShutdownRequestData().setBrokerId(1), version); ControlledShutdownRequest request = builder.build(); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/EndTxnRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/EndTxnRequestTest.java index 722e9a5a589..f14bf661619 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/EndTxnRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/EndTxnRequestTest.java @@ -41,7 +41,7 @@ public class EndTxnRequestTest { .setProducerId(producerId) .setTransactionalId(transactionId)); - for (short version = 0; version <= ApiKeys.END_TXN.latestVersion(); version++) { + for (short version : ApiKeys.END_TXN.allVersions()) { EndTxnRequest request = builder.build(version); EndTxnResponse response = request.getErrorResponse(throttleTimeMs, Errors.NOT_COORDINATOR.exception()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/EndTxnResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/EndTxnResponseTest.java index 34646dc0b4b..39c4bc04104 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/EndTxnResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/EndTxnResponseTest.java @@ -38,7 +38,7 @@ public class EndTxnResponseTest { Map expectedErrorCounts = Collections.singletonMap(Errors.NOT_COORDINATOR, 1); - for (short version = 0; version <= ApiKeys.END_TXN.latestVersion(); version++) { + for (short version : ApiKeys.END_TXN.allVersions()) { EndTxnResponse response = new EndTxnResponse(data); assertEquals(expectedErrorCounts, response.errorCounts()); assertEquals(throttleTimeMs, response.throttleTimeMs()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/EnvelopeRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/EnvelopeRequestTest.java index a7bccd56cf1..36b8618f49d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/EnvelopeRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/EnvelopeRequestTest.java @@ -46,7 +46,7 @@ public class EnvelopeRequestTest { @Test public void testToSend() throws IOException { - for (short version = ApiKeys.ENVELOPE.oldestVersion(); version <= ApiKeys.ENVELOPE.latestVersion(); version++) { + for (short version : ApiKeys.ENVELOPE.allVersions()) { ByteBuffer requestData = ByteBuffer.wrap("foobar".getBytes()); RequestHeader header = new RequestHeader(ApiKeys.ENVELOPE, version, "clientId", 15); EnvelopeRequest request = new EnvelopeRequest.Builder( diff --git a/clients/src/test/java/org/apache/kafka/common/requests/EnvelopeResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/EnvelopeResponseTest.java index 0a384cdd166..e0fa2fdbcdd 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/EnvelopeResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/EnvelopeResponseTest.java @@ -32,7 +32,7 @@ class EnvelopeResponseTest { @Test public void testToSend() { - for (short version = ApiKeys.ENVELOPE.oldestVersion(); version <= ApiKeys.ENVELOPE.latestVersion(); version++) { + for (short version : ApiKeys.ENVELOPE.allVersions()) { ByteBuffer responseData = ByteBuffer.wrap("foobar".getBytes()); EnvelopeResponse response = new EnvelopeResponse(responseData, Errors.NONE); short headerVersion = ApiKeys.ENVELOPE.responseHeaderVersion(version); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java index 4fe51a0dccd..de9914c575e 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java @@ -64,8 +64,7 @@ public class LeaderAndIsrRequestTest { Uuid topicId = Uuid.randomUuid(); String topicName = "topic"; int partition = 0; - - for (short version = LEADER_AND_ISR.oldestVersion(); version <= LEADER_AND_ISR.latestVersion(); version++) { + for (short version : LEADER_AND_ISR.allVersions()) { LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(version, 0, 0, 0, Collections.singletonList(new LeaderAndIsrPartitionState() .setTopicName(topicName) @@ -108,7 +107,7 @@ public class LeaderAndIsrRequestTest { */ @Test public void testVersionLogic() { - for (short version = LEADER_AND_ISR.oldestVersion(); version <= LEADER_AND_ISR.latestVersion(); version++) { + for (short version : LEADER_AND_ISR.allVersions()) { List partitionStates = asList( new LeaderAndIsrPartitionState() .setTopicName("topic0") diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java index 9ae2fdb4204..9f46304a4de 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java @@ -71,7 +71,7 @@ public class LeaderAndIsrResponseTest { @Test public void testErrorCountsWithTopLevelError() { - for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { + for (short version : LEADER_AND_ISR.allVersions()) { LeaderAndIsrResponse response; if (version < 5) { List partitions = createPartitions("foo", @@ -92,7 +92,7 @@ public class LeaderAndIsrResponseTest { @Test public void testErrorCountsNoTopLevelError() { - for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { + for (short version : LEADER_AND_ISR.allVersions()) { LeaderAndIsrResponse response; if (version < 5) { List partitions = createPartitions("foo", @@ -116,7 +116,7 @@ public class LeaderAndIsrResponseTest { @Test public void testToString() { - for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) { + for (short version : LEADER_AND_ISR.allVersions()) { LeaderAndIsrResponse response; if (version < 5) { List partitions = createPartitions("foo", diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java index 411efef71f5..1694ef5fdf9 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupRequestTest.java @@ -67,7 +67,7 @@ public class LeaveGroupRequestTest { .setGroupId(groupId) .setMembers(members); - for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { + for (short version : ApiKeys.LEAVE_GROUP.allVersions()) { try { LeaveGroupRequest request = builder.build(version); if (version <= 2) { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java index cc9819133f4..d5132182cee 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaveGroupResponseTest.java @@ -67,7 +67,7 @@ public class LeaveGroupResponseTest { expectedErrorCounts.put(Errors.UNKNOWN_MEMBER_ID, 1); expectedErrorCounts.put(Errors.FENCED_INSTANCE_ID, 1); - for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { + for (short version : ApiKeys.LEAVE_GROUP.allVersions()) { LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(memberResponses, Errors.NONE, throttleTimeMs, @@ -95,7 +95,7 @@ public class LeaveGroupResponseTest { @Test public void testShouldThrottle() { LeaveGroupResponse response = new LeaveGroupResponse(new LeaveGroupResponseData()); - for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { + for (short version : ApiKeys.LEAVE_GROUP.allVersions()) { if (version >= 2) { assertTrue(response.shouldClientThrottle(version)); } else { @@ -109,7 +109,7 @@ public class LeaveGroupResponseTest { LeaveGroupResponseData responseData = new LeaveGroupResponseData() .setErrorCode(Errors.NONE.code()) .setThrottleTimeMs(throttleTimeMs); - for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { + for (short version : ApiKeys.LEAVE_GROUP.allVersions()) { LeaveGroupResponse primaryResponse = LeaveGroupResponse.parse( MessageUtil.toByteBuffer(responseData, version), version); LeaveGroupResponse secondaryResponse = LeaveGroupResponse.parse( @@ -129,7 +129,7 @@ public class LeaveGroupResponseTest { .setErrorCode(Errors.NOT_COORDINATOR.code()) .setThrottleTimeMs(throttleTimeMs); - for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { + for (short version : ApiKeys.LEAVE_GROUP.allVersions()) { ByteBuffer buffer = MessageUtil.toByteBuffer(data, version); LeaveGroupResponse leaveGroupResponse = LeaveGroupResponse.parse(buffer, version); assertEquals(expectedErrorCounts, leaveGroupResponse.errorCounts()); @@ -146,7 +146,7 @@ public class LeaveGroupResponseTest { @Test public void testEqualityWithMemberResponses() { - for (short version = 0; version <= ApiKeys.LEAVE_GROUP.latestVersion(); version++) { + for (short version : ApiKeys.LEAVE_GROUP.allVersions()) { List localResponses = version > 2 ? memberResponses : memberResponses.subList(0, 1); LeaveGroupResponse primaryResponse = new LeaveGroupResponse(localResponses, Errors.NONE, diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java index cb853162209..08ae7a3fbd5 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java @@ -91,7 +91,7 @@ public class OffsetCommitRequestTest { OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data); - for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) { + for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) { OffsetCommitRequest request = builder.build(version); assertEquals(expectedOffsets, request.offsets()); @@ -130,7 +130,7 @@ public class OffsetCommitRequestTest { .setGroupInstanceId(groupInstanceId) ); - for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { if (version >= 7) { builder.build(version); } else { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitResponseTest.java index c6791e64f4f..b9ce03f1e3a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitResponseTest.java @@ -85,7 +85,7 @@ public class OffsetCommitResponseTest { )) .setThrottleTimeMs(throttleTimeMs); - for (short version = 0; version <= ApiKeys.OFFSET_COMMIT.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) { ByteBuffer buffer = MessageUtil.toByteBuffer(data, version); OffsetCommitResponse response = OffsetCommitResponse.parse(buffer, version); assertEquals(expectedErrorCounts, response.errorCounts()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java index 51db93d629d..ddb2cd9a943 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java @@ -75,7 +75,7 @@ public class OffsetFetchRequestTest { )); } - for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { OffsetFetchRequest request = builder.build(version); assertFalse(request.isAllPartitions()); assertEquals(groupId, request.groupId()); @@ -101,7 +101,7 @@ public class OffsetFetchRequestTest { @Test public void testConstructorFailForUnsupportedRequireStable() { - for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { // The builder needs to be initialized every cycle as the internal data `requireStable` flag is flipped. builder = new OffsetFetchRequest.Builder(groupId, true, null, false); final short finalVersion = version; @@ -123,7 +123,7 @@ public class OffsetFetchRequestTest { @Test public void testBuildThrowForUnsupportedRequireStable() { - for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { builder = new OffsetFetchRequest.Builder(groupId, true, null, true); if (version < 7) { final short finalVersion = version; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java index 62cf3a95353..e202fc200f7 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchResponseTest.java @@ -103,7 +103,7 @@ public class OffsetFetchResponseTest { OffsetFetchResponse latestResponse = new OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap); - for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { OffsetFetchResponseData data = new OffsetFetchResponseData( new ByteBufferAccessor(latestResponse.serialize(version)), version); @@ -154,7 +154,7 @@ public class OffsetFetchResponseTest { @Test public void testShouldThrottle() { OffsetFetchResponse response = new OffsetFetchResponse(throttleTimeMs, Errors.NONE, partitionDataMap); - for (short version = 0; version <= ApiKeys.OFFSET_FETCH.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { if (version >= 4) { assertTrue(response.shouldClientThrottle(version)); } else { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java index 3d4b41b52e4..e5dacd5b3d7 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java @@ -34,15 +34,15 @@ public class OffsetsForLeaderEpochRequestTest { assertThrows(UnsupportedVersionException.class, () -> builder.build(v)); } - for (short version = 3; version < ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(); version++) { - OffsetsForLeaderEpochRequest request = builder.build((short) 3); + for (short version = 3; version <= ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(); version++) { + OffsetsForLeaderEpochRequest request = builder.build(version); assertEquals(OffsetsForLeaderEpochRequest.CONSUMER_REPLICA_ID, request.replicaId()); } } @Test public void testDefaultReplicaId() { - for (short version = 0; version < ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(); version++) { + for (short version : ApiKeys.OFFSET_FOR_LEADER_EPOCH.allVersions()) { int replicaId = 1; OffsetsForLeaderEpochRequest.Builder builder = OffsetsForLeaderEpochRequest.Builder.forFollower( version, new OffsetForLeaderTopicCollection(), replicaId); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java index a2367e49042..fee026e7481 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.CompressionType; @@ -32,11 +33,12 @@ import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; +import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; public class ProduceRequestTest { @@ -151,7 +153,7 @@ public class ProduceRequestTest { .setRecords(MemoryRecords.readableRecords(buffer))))).iterator())) .setAcks((short) 1) .setTimeoutMs(5000)); - assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); + assertThrowsForAllVersions(requestBuilder, InvalidRecordException.class); } @Test @@ -166,7 +168,7 @@ public class ProduceRequestTest { .setRecords(MemoryRecords.EMPTY)))).iterator())) .setAcks((short) 1) .setTimeoutMs(5000)); - assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); + assertThrowsForAllVersions(requestBuilder, InvalidRecordException.class); } @Test @@ -186,7 +188,7 @@ public class ProduceRequestTest { .setRecords(builder.build())))).iterator())) .setAcks((short) 1) .setTimeoutMs(5000)); - assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); + assertThrowsForAllVersions(requestBuilder, InvalidRecordException.class); } @Test @@ -206,7 +208,7 @@ public class ProduceRequestTest { .iterator())) .setAcks((short) 1) .setTimeoutMs(5000)); - assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); + assertThrowsForAllVersions(requestBuilder, InvalidRecordException.class); } @Test @@ -230,7 +232,7 @@ public class ProduceRequestTest { for (short version = 3; version < 7; version++) { ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, produceData); - assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder); + assertThrowsForAllVersions(requestBuilder, UnsupportedCompressionTypeException.class); } // Works fine with current version (>= 7) @@ -291,20 +293,10 @@ public class ProduceRequestTest { assertTrue(RequestTestUtils.hasIdempotentRecords(request)); } - private void assertThrowsInvalidRecordExceptionForAllVersions(ProduceRequest.Builder builder) { - for (short version = builder.oldestAllowedVersion(); version < builder.latestAllowedVersion(); version++) { - assertThrowsInvalidRecordException(builder, version); - } - } - - private void assertThrowsInvalidRecordException(ProduceRequest.Builder builder, short version) { - try { - builder.build(version).serialize(); - fail("Builder did not raise " + InvalidRecordException.class.getName() + " as expected"); - } catch (RuntimeException e) { - assertTrue(InvalidRecordException.class.isAssignableFrom(e.getClass()), - "Unexpected exception type " + e.getClass().getName()); - } + private static void assertThrowsForAllVersions(ProduceRequest.Builder builder, + Class expectedType) { + IntStream.range(builder.oldestAllowedVersion(), builder.latestAllowedVersion() + 1) + .forEach(version -> assertThrows(expectedType, () -> builder.build((short) version).serialize())); } private ProduceRequest createNonIdempotentNonTransactionalRecords() { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java index bfadb8c3bbd..771ff0da7b4 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java @@ -89,10 +89,10 @@ public class ProduceResponseTest { "Produce failed"); responseData.put(tp, partResponse); - for (short ver = 0; ver <= PRODUCE.latestVersion(); ver++) { + for (short version : PRODUCE.allVersions()) { ProduceResponse response = new ProduceResponse(responseData); - ProduceResponse.PartitionResponse deserialized = ProduceResponse.parse(response.serialize(ver), ver).responses().get(tp); - if (ver >= 8) { + ProduceResponse.PartitionResponse deserialized = ProduceResponse.parse(response.serialize(version), version).responses().get(tp); + if (version >= 8) { assertEquals(1, deserialized.recordErrors.size()); assertEquals(3, deserialized.recordErrors.get(0).batchIndex); assertEquals("Record error", deserialized.recordErrors.get(0).message); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index cebb99d17ef..d3d965d3910 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -221,11 +221,16 @@ import java.util.Set; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.apache.kafka.common.protocol.ApiKeys.CREATE_PARTITIONS; +import static org.apache.kafka.common.protocol.ApiKeys.CREATE_TOPICS; +import static org.apache.kafka.common.protocol.ApiKeys.DELETE_TOPICS; import static org.apache.kafka.common.protocol.ApiKeys.DESCRIBE_CONFIGS; import static org.apache.kafka.common.protocol.ApiKeys.FETCH; import static org.apache.kafka.common.protocol.ApiKeys.JOIN_GROUP; +import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR; import static org.apache.kafka.common.protocol.ApiKeys.LIST_GROUPS; import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS; +import static org.apache.kafka.common.protocol.ApiKeys.STOP_REPLICA; import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP; import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -267,26 +272,26 @@ public class RequestResponseTest { checkErrorResponse(createHeartBeatRequest(), unknownServerException, true); checkResponse(createHeartBeatResponse(), 0, true); - for (int v = ApiKeys.JOIN_GROUP.oldestVersion(); v <= ApiKeys.JOIN_GROUP.latestVersion(); v++) { - checkRequest(createJoinGroupRequest(v), true); - checkErrorResponse(createJoinGroupRequest(v), unknownServerException, true); - checkResponse(createJoinGroupResponse(v), v, true); + for (short version : JOIN_GROUP.allVersions()) { + checkRequest(createJoinGroupRequest(version), true); + checkErrorResponse(createJoinGroupRequest(version), unknownServerException, true); + checkResponse(createJoinGroupResponse(version), version, true); } - for (int v = ApiKeys.SYNC_GROUP.oldestVersion(); v <= ApiKeys.SYNC_GROUP.latestVersion(); v++) { - checkRequest(createSyncGroupRequest(v), true); - checkErrorResponse(createSyncGroupRequest(v), unknownServerException, true); - checkResponse(createSyncGroupResponse(v), v, true); + for (short version : SYNC_GROUP.allVersions()) { + checkRequest(createSyncGroupRequest(version), true); + checkErrorResponse(createSyncGroupRequest(version), unknownServerException, true); + checkResponse(createSyncGroupResponse(version), version, true); } checkRequest(createLeaveGroupRequest(), true); checkErrorResponse(createLeaveGroupRequest(), unknownServerException, true); checkResponse(createLeaveGroupResponse(), 0, true); - for (short v = ApiKeys.LIST_GROUPS.oldestVersion(); v <= ApiKeys.LIST_GROUPS.latestVersion(); v++) { - checkRequest(createListGroupsRequest(v), false); - checkErrorResponse(createListGroupsRequest(v), unknownServerException, true); - checkResponse(createListGroupsResponse(v), v, true); + for (short version : ApiKeys.LIST_GROUPS.allVersions()) { + checkRequest(createListGroupsRequest(version), false); + checkErrorResponse(createListGroupsRequest(version), unknownServerException, true); + checkResponse(createListGroupsResponse(version), version, true); } checkRequest(createDescribeGroupRequest(), true); @@ -295,10 +300,10 @@ public class RequestResponseTest { checkRequest(createDeleteGroupsRequest(), true); checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true); checkResponse(createDeleteGroupsResponse(), 0, true); - for (int i = 0; i < ApiKeys.LIST_OFFSETS.latestVersion(); i++) { - checkRequest(createListOffsetRequest(i), true); - checkErrorResponse(createListOffsetRequest(i), unknownServerException, true); - checkResponse(createListOffsetResponse(i), i, true); + for (short version : LIST_OFFSETS.allVersions()) { + checkRequest(createListOffsetRequest(version), true); + checkErrorResponse(createListOffsetRequest(version), unknownServerException, true); + checkResponse(createListOffsetResponse(version), version, true); } checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true); checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true); @@ -331,18 +336,18 @@ public class RequestResponseTest { checkResponse(createProduceResponse(), 2, true); checkResponse(createProduceResponseWithErrorMessage(), 8, true); - for (int v = ApiKeys.STOP_REPLICA.oldestVersion(); v <= ApiKeys.STOP_REPLICA.latestVersion(); v++) { - checkRequest(createStopReplicaRequest(v, true), true); - checkRequest(createStopReplicaRequest(v, false), true); - checkErrorResponse(createStopReplicaRequest(v, true), unknownServerException, true); - checkErrorResponse(createStopReplicaRequest(v, false), unknownServerException, true); - checkResponse(createStopReplicaResponse(), v, true); + for (short version : STOP_REPLICA.allVersions()) { + checkRequest(createStopReplicaRequest(version, true), true); + checkRequest(createStopReplicaRequest(version, false), true); + checkErrorResponse(createStopReplicaRequest(version, true), unknownServerException, true); + checkErrorResponse(createStopReplicaRequest(version, false), unknownServerException, true); + checkResponse(createStopReplicaResponse(), version, true); } - for (int v = ApiKeys.LEADER_AND_ISR.oldestVersion(); v <= ApiKeys.LEADER_AND_ISR.latestVersion(); v++) { - checkRequest(createLeaderAndIsrRequest(v), true); - checkErrorResponse(createLeaderAndIsrRequest(v), unknownServerException, false); - checkResponse(createLeaderAndIsrResponse(v), v, true); + for (short version : LEADER_AND_ISR.allVersions()) { + checkRequest(createLeaderAndIsrRequest(version), true); + checkErrorResponse(createLeaderAndIsrRequest(version), unknownServerException, false); + checkResponse(createLeaderAndIsrResponse(version), version, true); } checkRequest(createSaslHandshakeRequest(), true); @@ -353,23 +358,23 @@ public class RequestResponseTest { checkResponse(createSaslAuthenticateResponse(), 0, true); checkResponse(createSaslAuthenticateResponse(), 1, true); - for (int v = ApiKeys.CREATE_TOPICS.oldestVersion(); v <= ApiKeys.CREATE_TOPICS.latestVersion(); v++) { - checkRequest(createCreateTopicRequest(v), true); - checkErrorResponse(createCreateTopicRequest(v), unknownServerException, true); - checkResponse(createCreateTopicResponse(), v, true); + for (short version : CREATE_TOPICS.allVersions()) { + checkRequest(createCreateTopicRequest(version), true); + checkErrorResponse(createCreateTopicRequest(version), unknownServerException, true); + checkResponse(createCreateTopicResponse(), version, true); } - for (int v = ApiKeys.DELETE_TOPICS.oldestVersion(); v <= ApiKeys.DELETE_TOPICS.latestVersion(); v++) { - checkRequest(createDeleteTopicsRequest(v), true); - checkErrorResponse(createDeleteTopicsRequest(v), unknownServerException, true); - checkResponse(createDeleteTopicsResponse(), v, true); + for (short version : DELETE_TOPICS.allVersions()) { + checkRequest(createDeleteTopicsRequest(version), true); + checkErrorResponse(createDeleteTopicsRequest(version), unknownServerException, true); + checkResponse(createDeleteTopicsResponse(), version, true); } - for (int v = ApiKeys.CREATE_PARTITIONS.oldestVersion(); v <= ApiKeys.CREATE_PARTITIONS.latestVersion(); v++) { - checkRequest(createCreatePartitionsRequest(v), true); - checkRequest(createCreatePartitionsRequestWithAssignments(v), false); - checkErrorResponse(createCreatePartitionsRequest(v), unknownServerException, true); - checkResponse(createCreatePartitionsResponse(), v, true); + for (short version : CREATE_PARTITIONS.allVersions()) { + checkRequest(createCreatePartitionsRequest(version), true); + checkRequest(createCreatePartitionsRequestWithAssignments(version), false); + checkErrorResponse(createCreatePartitionsRequest(version), unknownServerException, true); + checkResponse(createCreatePartitionsResponse(), version, true); } checkRequest(createInitPidRequest(), true); @@ -518,75 +523,75 @@ public class RequestResponseTest { @Test public void testApiVersionsSerialization() { - for (short v : ApiKeys.API_VERSIONS.allVersions()) { - checkRequest(createApiVersionRequest(v), true); - checkErrorResponse(createApiVersionRequest(v), unknownServerException, true); - checkErrorResponse(createApiVersionRequest(v), new UnsupportedVersionException("Not Supported"), true); - checkResponse(createApiVersionResponse(), v, true); - checkResponse(ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER), v, true); + for (short version : ApiKeys.API_VERSIONS.allVersions()) { + checkRequest(createApiVersionRequest(version), true); + checkErrorResponse(createApiVersionRequest(version), unknownServerException, true); + checkErrorResponse(createApiVersionRequest(version), new UnsupportedVersionException("Not Supported"), true); + checkResponse(createApiVersionResponse(), version, true); + checkResponse(ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER), version, true); } } @Test public void testBrokerHeartbeatSerialization() { - for (short v : ApiKeys.BROKER_HEARTBEAT.allVersions()) { - checkRequest(createBrokerHeartbeatRequest(v), true); - checkErrorResponse(createBrokerHeartbeatRequest(v), unknownServerException, true); - checkResponse(createBrokerHeartbeatResponse(), v, true); + for (short version : ApiKeys.BROKER_HEARTBEAT.allVersions()) { + checkRequest(createBrokerHeartbeatRequest(version), true); + checkErrorResponse(createBrokerHeartbeatRequest(version), unknownServerException, true); + checkResponse(createBrokerHeartbeatResponse(), version, true); } } @Test public void testBrokerRegistrationSerialization() { - for (short v : ApiKeys.BROKER_REGISTRATION.allVersions()) { - checkRequest(createBrokerRegistrationRequest(v), true); - checkErrorResponse(createBrokerRegistrationRequest(v), unknownServerException, true); + for (short version : ApiKeys.BROKER_REGISTRATION.allVersions()) { + checkRequest(createBrokerRegistrationRequest(version), true); + checkErrorResponse(createBrokerRegistrationRequest(version), unknownServerException, true); checkResponse(createBrokerRegistrationResponse(), 0, true); } } @Test public void testDescribeProducersSerialization() { - for (short v : ApiKeys.DESCRIBE_PRODUCERS.allVersions()) { - checkRequest(createDescribeProducersRequest(v), true); - checkErrorResponse(createDescribeProducersRequest(v), unknownServerException, true); - checkResponse(createDescribeProducersResponse(), v, true); + for (short version : ApiKeys.DESCRIBE_PRODUCERS.allVersions()) { + checkRequest(createDescribeProducersRequest(version), true); + checkErrorResponse(createDescribeProducersRequest(version), unknownServerException, true); + checkResponse(createDescribeProducersResponse(), version, true); } } @Test public void testDescribeTransactionsSerialization() { - for (short v : ApiKeys.DESCRIBE_TRANSACTIONS.allVersions()) { - checkRequest(createDescribeTransactionsRequest(v), true); - checkErrorResponse(createDescribeTransactionsRequest(v), unknownServerException, true); - checkResponse(createDescribeTransactionsResponse(), v, true); + for (short version : ApiKeys.DESCRIBE_TRANSACTIONS.allVersions()) { + checkRequest(createDescribeTransactionsRequest(version), true); + checkErrorResponse(createDescribeTransactionsRequest(version), unknownServerException, true); + checkResponse(createDescribeTransactionsResponse(), version, true); } } @Test public void testListTransactionsSerialization() { - for (short v : ApiKeys.LIST_TRANSACTIONS.allVersions()) { - checkRequest(createListTransactionsRequest(v), true); - checkErrorResponse(createListTransactionsRequest(v), unknownServerException, true); - checkResponse(createListTransactionsResponse(), v, true); + for (short version : ApiKeys.LIST_TRANSACTIONS.allVersions()) { + checkRequest(createListTransactionsRequest(version), true); + checkErrorResponse(createListTransactionsRequest(version), unknownServerException, true); + checkResponse(createListTransactionsResponse(), version, true); } } @Test public void testDescribeClusterSerialization() { - for (short v : ApiKeys.DESCRIBE_CLUSTER.allVersions()) { - checkRequest(createDescribeClusterRequest(v), true); - checkErrorResponse(createDescribeClusterRequest(v), unknownServerException, true); - checkResponse(createDescribeClusterResponse(), v, true); + for (short version : ApiKeys.DESCRIBE_CLUSTER.allVersions()) { + checkRequest(createDescribeClusterRequest(version), true); + checkErrorResponse(createDescribeClusterRequest(version), unknownServerException, true); + checkResponse(createDescribeClusterResponse(), version, true); } } @Test public void testUnregisterBrokerSerialization() { - for (short v : ApiKeys.UNREGISTER_BROKER.allVersions()) { - checkRequest(createUnregisterBrokerRequest(v), true); - checkErrorResponse(createUnregisterBrokerRequest(v), unknownServerException, true); - checkResponse(createUnregisterBrokerResponse(), v, true); + for (short version : ApiKeys.UNREGISTER_BROKER.allVersions()) { + checkRequest(createUnregisterBrokerRequest(version), true); + checkErrorResponse(createUnregisterBrokerRequest(version), unknownServerException, true); + checkResponse(createUnregisterBrokerResponse(), version, true); } } @@ -623,13 +628,12 @@ public class RequestResponseTest { } private void checkOlderFetchVersions() { - int latestVersion = FETCH.latestVersion(); - for (int i = 0; i < latestVersion; ++i) { - if (i > 7) { - checkErrorResponse(createFetchRequest(i), unknownServerException, true); + for (short version : FETCH.allVersions()) { + if (version > 7) { + checkErrorResponse(createFetchRequest(version), unknownServerException, true); } - checkRequest(createFetchRequest(i), true); - checkResponse(createFetchResponse(i >= 4), i, true); + checkRequest(createFetchRequest(version), true); + checkResponse(createFetchResponse(version >= 4), version, true); } } @@ -668,12 +672,11 @@ public class RequestResponseTest { } private void checkDescribeConfigsResponseVersions() { - for (int version = ApiKeys.DESCRIBE_CONFIGS.oldestVersion(); version < ApiKeys.DESCRIBE_CONFIGS.latestVersion(); ++version) { - short apiVersion = (short) version; - DescribeConfigsResponse response = createDescribeConfigsResponse(apiVersion); + for (short version : ApiKeys.DESCRIBE_CONFIGS.allVersions()) { + DescribeConfigsResponse response = createDescribeConfigsResponse(version); DescribeConfigsResponse deserialized0 = (DescribeConfigsResponse) AbstractResponse.parseResponse(ApiKeys.DESCRIBE_CONFIGS, - response.serialize(apiVersion), apiVersion); - verifyDescribeConfigsResponse(response, deserialized0, apiVersion); + response.serialize(version), version); + verifyDescribeConfigsResponse(response, deserialized0, version); } } @@ -859,7 +862,7 @@ public class RequestResponseTest { verifyFetchResponseFullWrite(FETCH.latestVersion(), createFetchResponse(123)); verifyFetchResponseFullWrite(FETCH.latestVersion(), createFetchResponse(Errors.FETCH_SESSION_ID_NOT_FOUND, 123)); - for (short version = 0; version <= FETCH.latestVersion(); version++) { + for (short version : FETCH.allVersions()) { verifyFetchResponseFullWrite(version, createFetchResponse(version >= 4)); } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java index 50c69746139..3446d498c6a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaRequestTest.java @@ -66,7 +66,7 @@ public class StopReplicaRequestTest { } } - for (short version = STOP_REPLICA.oldestVersion(); version <= STOP_REPLICA.latestVersion(); version++) { + for (short version : STOP_REPLICA.allVersions()) { StopReplicaRequest.Builder builder = new StopReplicaRequest.Builder(version, 0, 0, 0L, false, topicStates); StopReplicaRequest request = builder.build(); @@ -93,7 +93,7 @@ public class StopReplicaRequestTest { Map expectedPartitionStates = StopReplicaRequestTest.partitionStates(topicStates); - for (short version = STOP_REPLICA.oldestVersion(); version <= STOP_REPLICA.latestVersion(); version++) { + for (short version : STOP_REPLICA.allVersions()) { StopReplicaRequest request = new StopReplicaRequest.Builder(version, 0, 1, 0, deletePartitions, topicStates).build(version); StopReplicaRequestData data = request.data(); @@ -128,7 +128,7 @@ public class StopReplicaRequestTest { public void testTopicStatesNormalization() { List topicStates = topicStates(true); - for (short version = STOP_REPLICA.oldestVersion(); version <= STOP_REPLICA.latestVersion(); version++) { + for (short version : STOP_REPLICA.allVersions()) { // Create a request for version to get its serialized form StopReplicaRequest baseRequest = new StopReplicaRequest.Builder(version, 0, 1, 0, true, topicStates).build(version); @@ -163,7 +163,7 @@ public class StopReplicaRequestTest { public void testPartitionStatesNormalization() { List topicStates = topicStates(true); - for (short version = STOP_REPLICA.oldestVersion(); version <= STOP_REPLICA.latestVersion(); version++) { + for (short version : STOP_REPLICA.allVersions()) { // Create a request for version to get its serialized form StopReplicaRequest baseRequest = new StopReplicaRequest.Builder(version, 0, 1, 0, true, topicStates).build(version); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java index c3d049d482c..a0a5eda01e8 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java @@ -44,7 +44,7 @@ public class StopReplicaResponseTest { new StopReplicaPartitionState().setPartitionIndex(0), new StopReplicaPartitionState().setPartitionIndex(1)))); - for (short version = STOP_REPLICA.oldestVersion(); version <= STOP_REPLICA.latestVersion(); version++) { + for (short version : STOP_REPLICA.allVersions()) { StopReplicaRequest request = new StopReplicaRequest.Builder(version, 15, 20, 0, false, topicStates).build(version); StopReplicaResponse response = request diff --git a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java index ec31e587c81..037066e9a03 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java @@ -116,7 +116,7 @@ public class TxnOffsetCommitRequestTest extends OffsetCommitRequestTest { )) ); - for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) { + for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) { final TxnOffsetCommitRequest request; if (version < 3) { request = builder.build(version); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java index f3510f17251..1f19ff2c937 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java @@ -54,7 +54,7 @@ public class TxnOffsetCommitResponseTest extends OffsetCommitResponseTest { .setErrorCode(errorTwo.code()))) )); - for (short version = 0; version <= ApiKeys.TXN_OFFSET_COMMIT.latestVersion(); version++) { + for (short version : ApiKeys.TXN_OFFSET_COMMIT.allVersions()) { TxnOffsetCommitResponse response = TxnOffsetCommitResponse.parse( MessageUtil.toByteBuffer(data, version), version); assertEquals(expectedErrorCounts, response.errorCounts()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java index 86178ca8521..6f9d5c24546 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/UpdateMetadataRequestTest.java @@ -61,7 +61,7 @@ public class UpdateMetadataRequestTest { @Test public void testGetErrorResponse() { - for (short version = UPDATE_METADATA.oldestVersion(); version < UPDATE_METADATA.latestVersion(); version++) { + for (short version : UPDATE_METADATA.allVersions()) { UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder( version, 0, 0, 0, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap()); UpdateMetadataRequest request = builder.build(); @@ -81,7 +81,7 @@ public class UpdateMetadataRequestTest { public void testVersionLogic() { String topic0 = "topic0"; String topic1 = "topic1"; - for (short version = UPDATE_METADATA.oldestVersion(); version <= UPDATE_METADATA.latestVersion(); version++) { + for (short version : UPDATE_METADATA.allVersions()) { List partitionStates = asList( new UpdateMetadataPartitionState() .setTopicName(topic0) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java index 6435845b766..13e8c8cd940 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java @@ -51,7 +51,7 @@ public class WriteTxnMarkersRequestTest { @Test public void testConstructor() { WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), markers); - for (short version = 0; version <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(); version++) { + for (short version : ApiKeys.WRITE_TXN_MARKERS.allVersions()) { WriteTxnMarkersRequest request = builder.build(version); assertEquals(1, request.markers().size()); WriteTxnMarkersRequest.TxnMarkerEntry marker = request.markers().get(0); @@ -66,7 +66,7 @@ public class WriteTxnMarkersRequestTest { @Test public void testGetErrorResponse() { WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(), markers); - for (short version = 0; version <= ApiKeys.WRITE_TXN_MARKERS.latestVersion(); version++) { + for (short version : ApiKeys.WRITE_TXN_MARKERS.allVersions()) { WriteTxnMarkersRequest request = builder.build(version); WriteTxnMarkersResponse errorResponse = request.getErrorResponse(throttleTimeMs, Errors.UNKNOWN_PRODUCER_ID.exception());