mirror of https://github.com/apache/kafka.git
MINOR: Cleanup OffsetFetchRequest/Response in MessageTest (#19576)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
The tests related of OffsetFetch request/response in MessageTest are incomprehensible. This patch rewrites them in a simpler way. Reviewers: TengYao Chi <frankvicky@apache.org>
This commit is contained in:
parent
443c01ca80
commit
6d67d82d5b
|
@ -84,7 +84,6 @@ public final class MessageTest {
|
||||||
|
|
||||||
private final String memberId = "memberId";
|
private final String memberId = "memberId";
|
||||||
private final String instanceId = "instanceId";
|
private final String instanceId = "instanceId";
|
||||||
private final List<Integer> listOfVersionsNonBatchOffsetFetch = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAddOffsetsToTxnVersions() throws Exception {
|
public void testAddOffsetsToTxnVersions() throws Exception {
|
||||||
|
@ -544,291 +543,83 @@ public final class MessageTest {
|
||||||
.setThrottleTimeMs(20));
|
.setThrottleTimeMs(20));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testOffsetFetchV1ToV7() throws Exception {
|
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
|
||||||
String groupId = "groupId";
|
public void testOffsetFetchRequestVersions(short version) throws Exception {
|
||||||
String topicName = "topic";
|
OffsetFetchRequestData request;
|
||||||
|
|
||||||
List<OffsetFetchRequestTopic> topics = Collections.singletonList(
|
if (version < 8) {
|
||||||
|
request = new OffsetFetchRequestData()
|
||||||
|
.setGroupId("groupId")
|
||||||
|
.setRequireStable(version == 7)
|
||||||
|
.setTopics(List.of(
|
||||||
new OffsetFetchRequestTopic()
|
new OffsetFetchRequestTopic()
|
||||||
.setName(topicName)
|
.setName("foo")
|
||||||
.setPartitionIndexes(Collections.singletonList(5)));
|
.setPartitionIndexes(List.of(0, 1, 2))
|
||||||
testAllMessageRoundTripsOffsetFetchV0ToV7(new OffsetFetchRequestData()
|
));
|
||||||
.setTopics(new ArrayList<>())
|
|
||||||
.setGroupId(groupId));
|
|
||||||
|
|
||||||
testAllMessageRoundTripsOffsetFetchV0ToV7(new OffsetFetchRequestData()
|
|
||||||
.setGroupId(groupId)
|
|
||||||
.setTopics(topics));
|
|
||||||
|
|
||||||
OffsetFetchRequestData allPartitionData = new OffsetFetchRequestData()
|
|
||||||
.setGroupId(groupId)
|
|
||||||
.setTopics(null);
|
|
||||||
|
|
||||||
OffsetFetchRequestData requireStableData = new OffsetFetchRequestData()
|
|
||||||
.setGroupId(groupId)
|
|
||||||
.setTopics(topics)
|
|
||||||
.setRequireStable(true);
|
|
||||||
|
|
||||||
for (int version : listOfVersionsNonBatchOffsetFetch) {
|
|
||||||
final short finalVersion = (short) version;
|
|
||||||
if (version < 2) {
|
|
||||||
assertThrows(NullPointerException.class, () -> testAllMessageRoundTripsOffsetFetchFromVersionToV7(finalVersion, allPartitionData));
|
|
||||||
} else {
|
} else {
|
||||||
testAllMessageRoundTripsOffsetFetchFromVersionToV7((short) version, allPartitionData);
|
request = new OffsetFetchRequestData()
|
||||||
}
|
.setRequireStable(true)
|
||||||
|
.setGroups(List.of(
|
||||||
if (version < 7) {
|
new OffsetFetchRequestGroup()
|
||||||
assertThrows(UnsupportedVersionException.class, () -> testAllMessageRoundTripsOffsetFetchFromVersionToV7(finalVersion, requireStableData));
|
.setGroupId("groupId")
|
||||||
} else {
|
.setMemberId(version >= 9 ? "memberId" : null)
|
||||||
testAllMessageRoundTripsOffsetFetchFromVersionToV7(finalVersion, requireStableData);
|
.setMemberEpoch(version >= 9 ? 10 : -1)
|
||||||
}
|
.setTopics(List.of(
|
||||||
}
|
|
||||||
|
|
||||||
Supplier<OffsetFetchResponseData> response =
|
|
||||||
() -> new OffsetFetchResponseData()
|
|
||||||
.setTopics(Collections.singletonList(
|
|
||||||
new OffsetFetchResponseTopic()
|
|
||||||
.setName(topicName)
|
|
||||||
.setPartitions(Collections.singletonList(
|
|
||||||
new OffsetFetchResponsePartition()
|
|
||||||
.setPartitionIndex(5)
|
|
||||||
.setMetadata(null)
|
|
||||||
.setCommittedOffset(100)
|
|
||||||
.setCommittedLeaderEpoch(3)
|
|
||||||
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())))))
|
|
||||||
.setErrorCode(Errors.NOT_COORDINATOR.code())
|
|
||||||
.setThrottleTimeMs(10);
|
|
||||||
for (int version : listOfVersionsNonBatchOffsetFetch) {
|
|
||||||
OffsetFetchResponseData responseData = response.get();
|
|
||||||
if (version <= 1) {
|
|
||||||
responseData.setErrorCode(Errors.NONE.code());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (version <= 2) {
|
|
||||||
responseData.setThrottleTimeMs(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (version <= 4) {
|
|
||||||
responseData.topics().get(0).partitions().get(0).setCommittedLeaderEpoch(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
testAllMessageRoundTripsOffsetFetchFromVersionToV7((short) version, responseData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testAllMessageRoundTripsOffsetFetchV0ToV7(Message message) throws Exception {
|
|
||||||
testDuplication(message);
|
|
||||||
testAllMessageRoundTripsOffsetFetchFromVersionToV7(message.lowestSupportedVersion(), message);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testAllMessageRoundTripsOffsetFetchFromVersionToV7(short fromVersion,
|
|
||||||
Message message) throws Exception {
|
|
||||||
for (short version = fromVersion; version <= 7; version++) {
|
|
||||||
testEquivalentMessageRoundTrip(version, message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testOffsetFetchV8AndAboveSingleGroup() throws Exception {
|
|
||||||
String groupId = "groupId";
|
|
||||||
String topicName = "topic";
|
|
||||||
|
|
||||||
List<OffsetFetchRequestTopics> topic = Collections.singletonList(
|
|
||||||
new OffsetFetchRequestTopics()
|
new OffsetFetchRequestTopics()
|
||||||
.setName(topicName)
|
.setName("foo")
|
||||||
.setPartitionIndexes(Collections.singletonList(5)));
|
.setPartitionIndexes(List.of(0, 1, 2))
|
||||||
|
))
|
||||||
OffsetFetchRequestData allPartitionData = new OffsetFetchRequestData()
|
));
|
||||||
.setGroups(Collections.singletonList(
|
|
||||||
new OffsetFetchRequestGroup()
|
|
||||||
.setGroupId(groupId)
|
|
||||||
.setTopics(null)));
|
|
||||||
|
|
||||||
OffsetFetchRequestData specifiedPartitionData = new OffsetFetchRequestData()
|
|
||||||
.setGroups(Collections.singletonList(
|
|
||||||
new OffsetFetchRequestGroup()
|
|
||||||
.setGroupId(groupId)
|
|
||||||
.setTopics(topic)))
|
|
||||||
.setRequireStable(true);
|
|
||||||
|
|
||||||
testAllMessageRoundTripsOffsetFetchV8AndAbove(allPartitionData);
|
|
||||||
testAllMessageRoundTripsOffsetFetchV8AndAbove(specifiedPartitionData);
|
|
||||||
|
|
||||||
for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
|
|
||||||
if (version >= 8) {
|
|
||||||
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, specifiedPartitionData);
|
|
||||||
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, allPartitionData);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Supplier<OffsetFetchResponseData> response =
|
testMessageRoundTrip(version, request, request);
|
||||||
() -> new OffsetFetchResponseData()
|
}
|
||||||
.setGroups(Collections.singletonList(
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
|
||||||
|
public void testOffsetFetchResponseVersions(short version) throws Exception {
|
||||||
|
OffsetFetchResponseData response;
|
||||||
|
|
||||||
|
if (version < 8) {
|
||||||
|
response = new OffsetFetchResponseData()
|
||||||
|
.setThrottleTimeMs(version >= 3 ? 1000 : 0)
|
||||||
|
.setErrorCode(version >= 2 ? Errors.INVALID_GROUP_ID.code() : 0)
|
||||||
|
.setTopics(List.of(
|
||||||
|
new OffsetFetchResponseTopic()
|
||||||
|
.setName("foo")
|
||||||
|
.setPartitions(List.of(
|
||||||
|
new OffsetFetchResponsePartition()
|
||||||
|
.setPartitionIndex(0)
|
||||||
|
.setCommittedOffset(10)
|
||||||
|
.setMetadata("meta")
|
||||||
|
.setCommittedLeaderEpoch(version >= 5 ? 20 : -1)
|
||||||
|
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
|
||||||
|
))
|
||||||
|
));
|
||||||
|
} else {
|
||||||
|
response = new OffsetFetchResponseData()
|
||||||
|
.setThrottleTimeMs(1000)
|
||||||
|
.setGroups(List.of(
|
||||||
new OffsetFetchResponseGroup()
|
new OffsetFetchResponseGroup()
|
||||||
.setGroupId(groupId)
|
.setGroupId("groupId")
|
||||||
.setTopics(Collections.singletonList(
|
.setErrorCode(Errors.INVALID_GROUP_ID.code())
|
||||||
|
.setTopics(List.of(
|
||||||
new OffsetFetchResponseTopics()
|
new OffsetFetchResponseTopics()
|
||||||
.setPartitions(Collections.singletonList(
|
.setName("foo")
|
||||||
|
.setPartitions(List.of(
|
||||||
new OffsetFetchResponsePartitions()
|
new OffsetFetchResponsePartitions()
|
||||||
.setPartitionIndex(5)
|
.setPartitionIndex(0)
|
||||||
.setMetadata(null)
|
.setCommittedOffset(10)
|
||||||
.setCommittedOffset(100)
|
.setMetadata("meta")
|
||||||
.setCommittedLeaderEpoch(3)
|
.setCommittedLeaderEpoch(20)
|
||||||
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())))))
|
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
|
||||||
.setErrorCode(Errors.NOT_COORDINATOR.code())))
|
))
|
||||||
.setThrottleTimeMs(10);
|
))
|
||||||
for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
|
));
|
||||||
if (version >= 8) {
|
|
||||||
OffsetFetchResponseData responseData = response.get();
|
|
||||||
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, responseData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
testMessageRoundTrip(version, response, response);
|
||||||
public void testOffsetFetchV8AndAbove() throws Exception {
|
|
||||||
String groupOne = "group1";
|
|
||||||
String groupTwo = "group2";
|
|
||||||
String groupThree = "group3";
|
|
||||||
String groupFour = "group4";
|
|
||||||
String groupFive = "group5";
|
|
||||||
String topic1 = "topic1";
|
|
||||||
String topic2 = "topic2";
|
|
||||||
String topic3 = "topic3";
|
|
||||||
|
|
||||||
OffsetFetchRequestTopics topicOne = new OffsetFetchRequestTopics()
|
|
||||||
.setName(topic1)
|
|
||||||
.setPartitionIndexes(Collections.singletonList(5));
|
|
||||||
OffsetFetchRequestTopics topicTwo = new OffsetFetchRequestTopics()
|
|
||||||
.setName(topic2)
|
|
||||||
.setPartitionIndexes(Collections.singletonList(10));
|
|
||||||
OffsetFetchRequestTopics topicThree = new OffsetFetchRequestTopics()
|
|
||||||
.setName(topic3)
|
|
||||||
.setPartitionIndexes(Collections.singletonList(15));
|
|
||||||
|
|
||||||
List<OffsetFetchRequestTopics> groupOneTopics = singletonList(topicOne);
|
|
||||||
OffsetFetchRequestGroup group1 =
|
|
||||||
new OffsetFetchRequestGroup()
|
|
||||||
.setGroupId(groupOne)
|
|
||||||
.setTopics(groupOneTopics);
|
|
||||||
|
|
||||||
List<OffsetFetchRequestTopics> groupTwoTopics = Arrays.asList(topicOne, topicTwo);
|
|
||||||
OffsetFetchRequestGroup group2 =
|
|
||||||
new OffsetFetchRequestGroup()
|
|
||||||
.setGroupId(groupTwo)
|
|
||||||
.setTopics(groupTwoTopics);
|
|
||||||
|
|
||||||
List<OffsetFetchRequestTopics> groupThreeTopics = Arrays.asList(topicOne, topicTwo, topicThree);
|
|
||||||
OffsetFetchRequestGroup group3 =
|
|
||||||
new OffsetFetchRequestGroup()
|
|
||||||
.setGroupId(groupThree)
|
|
||||||
.setTopics(groupThreeTopics);
|
|
||||||
|
|
||||||
OffsetFetchRequestGroup group4 =
|
|
||||||
new OffsetFetchRequestGroup()
|
|
||||||
.setGroupId(groupFour)
|
|
||||||
.setTopics(null);
|
|
||||||
|
|
||||||
OffsetFetchRequestGroup group5 =
|
|
||||||
new OffsetFetchRequestGroup()
|
|
||||||
.setGroupId(groupFive)
|
|
||||||
.setTopics(null);
|
|
||||||
|
|
||||||
OffsetFetchRequestData requestData = new OffsetFetchRequestData()
|
|
||||||
.setGroups(Arrays.asList(group1, group2, group3, group4, group5))
|
|
||||||
.setRequireStable(true);
|
|
||||||
|
|
||||||
testAllMessageRoundTripsOffsetFetchV8AndAbove(requestData);
|
|
||||||
|
|
||||||
testAllMessageRoundTripsOffsetFetchV8AndAbove(requestData.setRequireStable(false));
|
|
||||||
|
|
||||||
|
|
||||||
for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
|
|
||||||
if (version >= 8) {
|
|
||||||
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, requestData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
OffsetFetchResponseTopics responseTopic1 =
|
|
||||||
new OffsetFetchResponseTopics()
|
|
||||||
.setName(topic1)
|
|
||||||
.setPartitions(Collections.singletonList(
|
|
||||||
new OffsetFetchResponsePartitions()
|
|
||||||
.setPartitionIndex(5)
|
|
||||||
.setMetadata(null)
|
|
||||||
.setCommittedOffset(100)
|
|
||||||
.setCommittedLeaderEpoch(3)
|
|
||||||
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())));
|
|
||||||
OffsetFetchResponseTopics responseTopic2 =
|
|
||||||
new OffsetFetchResponseTopics()
|
|
||||||
.setName(topic2)
|
|
||||||
.setPartitions(Collections.singletonList(
|
|
||||||
new OffsetFetchResponsePartitions()
|
|
||||||
.setPartitionIndex(10)
|
|
||||||
.setMetadata("foo")
|
|
||||||
.setCommittedOffset(200)
|
|
||||||
.setCommittedLeaderEpoch(2)
|
|
||||||
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code())));
|
|
||||||
OffsetFetchResponseTopics responseTopic3 =
|
|
||||||
new OffsetFetchResponseTopics()
|
|
||||||
.setName(topic3)
|
|
||||||
.setPartitions(Collections.singletonList(
|
|
||||||
new OffsetFetchResponsePartitions()
|
|
||||||
.setPartitionIndex(15)
|
|
||||||
.setMetadata("bar")
|
|
||||||
.setCommittedOffset(300)
|
|
||||||
.setCommittedLeaderEpoch(1)
|
|
||||||
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())));
|
|
||||||
|
|
||||||
OffsetFetchResponseGroup responseGroup1 =
|
|
||||||
new OffsetFetchResponseGroup()
|
|
||||||
.setGroupId(groupOne)
|
|
||||||
.setTopics(Collections.singletonList(responseTopic1))
|
|
||||||
.setErrorCode(Errors.NOT_COORDINATOR.code());
|
|
||||||
OffsetFetchResponseGroup responseGroup2 =
|
|
||||||
new OffsetFetchResponseGroup()
|
|
||||||
.setGroupId(groupTwo)
|
|
||||||
.setTopics(Arrays.asList(responseTopic1, responseTopic2))
|
|
||||||
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
|
|
||||||
OffsetFetchResponseGroup responseGroup3 =
|
|
||||||
new OffsetFetchResponseGroup()
|
|
||||||
.setGroupId(groupThree)
|
|
||||||
.setTopics(Arrays.asList(responseTopic1, responseTopic2, responseTopic3))
|
|
||||||
.setErrorCode(Errors.NONE.code());
|
|
||||||
OffsetFetchResponseGroup responseGroup4 =
|
|
||||||
new OffsetFetchResponseGroup()
|
|
||||||
.setGroupId(groupFour)
|
|
||||||
.setTopics(Arrays.asList(responseTopic1, responseTopic2, responseTopic3))
|
|
||||||
.setErrorCode(Errors.NONE.code());
|
|
||||||
OffsetFetchResponseGroup responseGroup5 =
|
|
||||||
new OffsetFetchResponseGroup()
|
|
||||||
.setGroupId(groupFive)
|
|
||||||
.setTopics(Arrays.asList(responseTopic1, responseTopic2, responseTopic3))
|
|
||||||
.setErrorCode(Errors.NONE.code());
|
|
||||||
|
|
||||||
Supplier<OffsetFetchResponseData> response =
|
|
||||||
() -> new OffsetFetchResponseData()
|
|
||||||
.setGroups(Arrays.asList(responseGroup1, responseGroup2, responseGroup3,
|
|
||||||
responseGroup4, responseGroup5))
|
|
||||||
.setThrottleTimeMs(10);
|
|
||||||
for (short version : ApiKeys.OFFSET_FETCH.allVersions()) {
|
|
||||||
if (version >= 8) {
|
|
||||||
OffsetFetchResponseData responseData = response.get();
|
|
||||||
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(version, responseData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testAllMessageRoundTripsOffsetFetchV8AndAbove(Message message) throws Exception {
|
|
||||||
testDuplication(message);
|
|
||||||
testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove((short) 8, message);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testAllMessageRoundTripsOffsetFetchFromVersionV8AndAbove(short fromVersion, Message message) throws Exception {
|
|
||||||
for (short version = fromVersion; version <= message.highestSupportedVersion(); version++) {
|
|
||||||
testEquivalentMessageRoundTrip(version, message);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue