diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 4674bf2013e..b11b3b97056 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -84,7 +84,6 @@ public final class MessageTest { private final String memberId = "memberId"; private final String instanceId = "instanceId"; - private final List listOfVersionsNonBatchOffsetFetch = Arrays.asList(1, 2, 3, 4, 5, 6, 7); @Test public void testAddOffsetsToTxnVersions() throws Exception { @@ -544,291 +543,83 @@ public final class MessageTest { .setThrottleTimeMs(20)); } - @Test - public void testOffsetFetchV1ToV7() throws Exception { - String groupId = "groupId"; - String topicName = "topic"; + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + public void testOffsetFetchRequestVersions(short version) throws Exception { + OffsetFetchRequestData request; - List topics = Collections.singletonList( - new OffsetFetchRequestTopic() - .setName(topicName) - .setPartitionIndexes(Collections.singletonList(5))); - testAllMessageRoundTripsOffsetFetchV0ToV7(new OffsetFetchRequestData() - .setTopics(new ArrayList<>()) - .setGroupId(groupId)); - - testAllMessageRoundTripsOffsetFetchV0ToV7(new OffsetFetchRequestData() - .setGroupId(groupId) - .setTopics(topics)); - - OffsetFetchRequestData allPartitionData = new OffsetFetchRequestData() - .setGroupId(groupId) - .setTopics(null); - - OffsetFetchRequestData requireStableData = new OffsetFetchRequestData() - .setGroupId(groupId) - .setTopics(topics) - .setRequireStable(true); - - for (int version : listOfVersionsNonBatchOffsetFetch) { - final short finalVersion = (short) version; - if (version < 2) { - assertThrows(NullPointerException.class, () -> testAllMessageRoundTripsOffsetFetchFromVersionToV7(finalVersion, allPartitionData)); - } else { - testAllMessageRoundTripsOffsetFetchFromVersionToV7((short) version, allPartitionData); - } - - if (version < 7) { - assertThrows(UnsupportedVersionException.class, () -> testAllMessageRoundTripsOffsetFetchFromVersionToV7(finalVersion, requireStableData)); - } else { - testAllMessageRoundTripsOffsetFetchFromVersionToV7(finalVersion, requireStableData); - } + if (version < 8) { + request = new OffsetFetchRequestData() + .setGroupId("groupId") + .setRequireStable(version == 7) + .setTopics(List.of( + new OffsetFetchRequestTopic() + .setName("foo") + .setPartitionIndexes(List.of(0, 1, 2)) + )); + } else { + request = new OffsetFetchRequestData() + .setRequireStable(true) + .setGroups(List.of( + new OffsetFetchRequestGroup() + .setGroupId("groupId") + .setMemberId(version >= 9 ? "memberId" : null) + .setMemberEpoch(version >= 9 ? 10 : -1) + .setTopics(List.of( + new OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(List.of(0, 1, 2)) + )) + )); } - Supplier response = - () -> new OffsetFetchResponseData() - .setTopics(Collections.singletonList( - new OffsetFetchResponseTopic() - .setName(topicName) - .setPartitions(Collections.singletonList( - new OffsetFetchResponsePartition() - .setPartitionIndex(5) - .setMetadata(null) - .setCommittedOffset(100) - .setCommittedLeaderEpoch(3) - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()))))) - .setErrorCode(Errors.NOT_COORDINATOR.code()) - .setThrottleTimeMs(10); - for (int version : listOfVersionsNonBatchOffsetFetch) { - OffsetFetchResponseData responseData = response.get(); - if (version <= 1) { - responseData.setErrorCode(Errors.NONE.code()); - } - - if (version <= 2) { - responseData.setThrottleTimeMs(0); - } - - if (version <= 4) { - responseData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1); - } - - testAllMessageRoundTripsOffsetFetchFromVersionToV7((short) version, responseData); - } + testMessageRoundTrip(version, request, request); } - private void testAllMessageRoundTripsOffsetFetchV0ToV7(Message message) throws Exception { - testDuplication(message); - testAllMessageRoundTripsOffsetFetchFromVersionToV7(message.lowestSupportedVersion(), message); - } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + public void testOffsetFetchResponseVersions(short version) throws Exception { + OffsetFetchResponseData response; - private void testAllMessageRoundTripsOffsetFetchFromVersionToV7(short fromVersion, - Message message) throws Exception { - for (short version = fromVersion; version <= 7; version++) { - testEquivalentMessageRoundTrip(version, message); - } - } - - @Test - public void testOffsetFetchV8AndAboveSingleGroup() throws Exception { - String groupId = "groupId"; - String topicName = "topic"; - - List topic = Collections.singletonList( - new OffsetFetchRequestTopics() - .setName(topicName) - .setPartitionIndexes(Collections.singletonList(5))); - - OffsetFetchRequestData allPartitionData = new OffsetFetchRequestData() - .setGroups(Collections.singletonList( - new OffsetFetchRequestGroup() - .setGroupId(groupId) - .setTopics(null))); - - OffsetFetchRequestData specifiedPartitionData = new OffsetFetchRequestData() - .setGroups(Collections.singletonList( - new OffsetFetchRequestGroup() - .setGroupId(groupId) - .setTopics(topic))) - .setRequireStable(true); - - testAllMessageRoundTripsOffsetFetchV8AndAbove(allPartitionData); - testAllMessageRoundTripsOffsetFetchV8AndAbove(specifiedPartitionData); - - for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - if (version >= 8) { - testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, specifiedPartitionData); - testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, allPartitionData); - } - } - - Supplier response = - () -> new OffsetFetchResponseData() - .setGroups(Collections.singletonList( + if (version < 8) { + response = new OffsetFetchResponseData() + .setThrottleTimeMs(version >= 3 ? 1000 : 0) + .setErrorCode(version >= 2 ? Errors.INVALID_GROUP_ID.code() : 0) + .setTopics(List.of( + new OffsetFetchResponseTopic() + .setName("foo") + .setPartitions(List.of( + new OffsetFetchResponsePartition() + .setPartitionIndex(0) + .setCommittedOffset(10) + .setMetadata("meta") + .setCommittedLeaderEpoch(version >= 5 ? 20 : -1) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + )) + )); + } else { + response = new OffsetFetchResponseData() + .setThrottleTimeMs(1000) + .setGroups(List.of( new OffsetFetchResponseGroup() - .setGroupId(groupId) - .setTopics(Collections.singletonList( + .setGroupId("groupId") + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + .setTopics(List.of( new OffsetFetchResponseTopics() - .setPartitions(Collections.singletonList( + .setName("foo") + .setPartitions(List.of( new OffsetFetchResponsePartitions() - .setPartitionIndex(5) - .setMetadata(null) - .setCommittedOffset(100) - .setCommittedLeaderEpoch(3) - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()))))) - .setErrorCode(Errors.NOT_COORDINATOR.code()))) - .setThrottleTimeMs(10); - for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - if (version >= 8) { - OffsetFetchResponseData responseData = response.get(); - testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, responseData); - } - } - } - - @Test - public void testOffsetFetchV8AndAbove() throws Exception { - String groupOne = "group1"; - String groupTwo = "group2"; - String groupThree = "group3"; - String groupFour = "group4"; - String groupFive = "group5"; - String topic1 = "topic1"; - String topic2 = "topic2"; - String topic3 = "topic3"; - - OffsetFetchRequestTopics topicOne = new OffsetFetchRequestTopics() - .setName(topic1) - .setPartitionIndexes(Collections.singletonList(5)); - OffsetFetchRequestTopics topicTwo = new OffsetFetchRequestTopics() - .setName(topic2) - .setPartitionIndexes(Collections.singletonList(10)); - OffsetFetchRequestTopics topicThree = new OffsetFetchRequestTopics() - .setName(topic3) - .setPartitionIndexes(Collections.singletonList(15)); - - List groupOneTopics = singletonList(topicOne); - OffsetFetchRequestGroup group1 = - new OffsetFetchRequestGroup() - .setGroupId(groupOne) - .setTopics(groupOneTopics); - - List groupTwoTopics = Arrays.asList(topicOne, topicTwo); - OffsetFetchRequestGroup group2 = - new OffsetFetchRequestGroup() - .setGroupId(groupTwo) - .setTopics(groupTwoTopics); - - List groupThreeTopics = Arrays.asList(topicOne, topicTwo, topicThree); - OffsetFetchRequestGroup group3 = - new OffsetFetchRequestGroup() - .setGroupId(groupThree) - .setTopics(groupThreeTopics); - - OffsetFetchRequestGroup group4 = - new OffsetFetchRequestGroup() - .setGroupId(groupFour) - .setTopics(null); - - OffsetFetchRequestGroup group5 = - new OffsetFetchRequestGroup() - .setGroupId(groupFive) - .setTopics(null); - - OffsetFetchRequestData requestData = new OffsetFetchRequestData() - .setGroups(Arrays.asList(group1, group2, group3, group4, group5)) - .setRequireStable(true); - - testAllMessageRoundTripsOffsetFetchV8AndAbove(requestData); - - testAllMessageRoundTripsOffsetFetchV8AndAbove(requestData.setRequireStable(false)); - - - for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - if (version >= 8) { - testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, requestData); - } + .setPartitionIndex(0) + .setCommittedOffset(10) + .setMetadata("meta") + .setCommittedLeaderEpoch(20) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + )) + )) + )); } - OffsetFetchResponseTopics responseTopic1 = - new OffsetFetchResponseTopics() - .setName(topic1) - .setPartitions(Collections.singletonList( - new OffsetFetchResponsePartitions() - .setPartitionIndex(5) - .setMetadata(null) - .setCommittedOffset(100) - .setCommittedLeaderEpoch(3) - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()))); - OffsetFetchResponseTopics responseTopic2 = - new OffsetFetchResponseTopics() - .setName(topic2) - .setPartitions(Collections.singletonList( - new OffsetFetchResponsePartitions() - .setPartitionIndex(10) - .setMetadata("foo") - .setCommittedOffset(200) - .setCommittedLeaderEpoch(2) - .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()))); - OffsetFetchResponseTopics responseTopic3 = - new OffsetFetchResponseTopics() - .setName(topic3) - .setPartitions(Collections.singletonList( - new OffsetFetchResponsePartitions() - .setPartitionIndex(15) - .setMetadata("bar") - .setCommittedOffset(300) - .setCommittedLeaderEpoch(1) - .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code()))); - - OffsetFetchResponseGroup responseGroup1 = - new OffsetFetchResponseGroup() - .setGroupId(groupOne) - .setTopics(Collections.singletonList(responseTopic1)) - .setErrorCode(Errors.NOT_COORDINATOR.code()); - OffsetFetchResponseGroup responseGroup2 = - new OffsetFetchResponseGroup() - .setGroupId(groupTwo) - .setTopics(Arrays.asList(responseTopic1, responseTopic2)) - .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); - OffsetFetchResponseGroup responseGroup3 = - new OffsetFetchResponseGroup() - .setGroupId(groupThree) - .setTopics(Arrays.asList(responseTopic1, responseTopic2, responseTopic3)) - .setErrorCode(Errors.NONE.code()); - OffsetFetchResponseGroup responseGroup4 = - new OffsetFetchResponseGroup() - .setGroupId(groupFour) - .setTopics(Arrays.asList(responseTopic1, responseTopic2, responseTopic3)) - .setErrorCode(Errors.NONE.code()); - OffsetFetchResponseGroup responseGroup5 = - new OffsetFetchResponseGroup() - .setGroupId(groupFive) - .setTopics(Arrays.asList(responseTopic1, responseTopic2, responseTopic3)) - .setErrorCode(Errors.NONE.code()); - - Supplier response = - () -> new OffsetFetchResponseData() - .setGroups(Arrays.asList(responseGroup1, responseGroup2, responseGroup3, - responseGroup4, responseGroup5)) - .setThrottleTimeMs(10); - for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { - if (version >= 8) { - OffsetFetchResponseData responseData = response.get(); - testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, responseData); - } - } - } - - private void testAllMessageRoundTripsOffsetFetchV8AndAbove(Message message) throws Exception { - testDuplication(message); - testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove((short) 8, message); - } - - private void testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(short fromVersion, Message message) throws Exception { - for (short version = fromVersion; version <= message.highestSupportedVersion(); version++) { - testEquivalentMessageRoundTrip(version, message); - } + testMessageRoundTrip(version, response, response); } @Test