KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets field default to 1

Couple of failures observed after KAFKA-9627: Replace ListOffset request/response with automated protocol (https://github.com/apache/kafka/pull/8295)

1. Latest consumer fails to consume from 0.10.0.1 brokers. Below system tests are failing
kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest
kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest

Solution: Current default value for MaxNumOffsets is 0. because to this brokers are not returning offsets for v0 request. Set default value for MaxNumOffsets field to 1.  This is similar to previous [approach]
(https://github.com/apache/kafka/blob/2.6/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java#L204)

2. In some scenarios, latest consumer fails with below error when connecting to a Kafka cluster which consists of newer and older (<=2.0) Kafka brokers
`org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default currentLeaderEpoch at version 3`

Solution: After #8295, consumer can set non-default CurrentLeaderEpoch value for v3 and below requests. One solution is to make CurrentLeaderEpoch ignorable.

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: David Jacot <djacot@confluent.io>

Closes #9540 from omkreddy/fix-listoffsets
This commit is contained in:
Manikumar Reddy 2020-11-02 23:39:03 +05:30
parent d7aaa7007c
commit 236d7dc890
3 changed files with 32 additions and 15 deletions

View File

@ -42,11 +42,11 @@
"about": "Each partition in the request.", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1",
{ "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1", "ignorable": true,
"about": "The current leader epoch." },
{ "name": "Timestamp", "type": "int64", "versions": "0+",
"about": "The current timestamp." },
{ "name": "MaxNumOffsets", "type": "int32", "versions": "0",
{ "name": "MaxNumOffsets", "type": "int32", "versions": "0", "default": "1",
"about": "The maximum number of offsets to report." }
]}
]}

View File

@ -1243,7 +1243,8 @@ public class RequestResponseTest {
.setPartitions(Arrays.asList(new ListOffsetPartition()
.setPartitionIndex(0)
.setTimestamp(1000000L)
.setMaxNumOffsets(10)));
.setMaxNumOffsets(10)
.setCurrentLeaderEpoch(5)));
return ListOffsetRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(Collections.singletonList(topic))
@ -1253,7 +1254,8 @@ public class RequestResponseTest {
.setName("test")
.setPartitions(Arrays.asList(new ListOffsetPartition()
.setPartitionIndex(0)
.setTimestamp(1000000L)));
.setTimestamp(1000000L)
.setCurrentLeaderEpoch(5)));
return ListOffsetRequest.Builder
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(Collections.singletonList(topic))
@ -1261,10 +1263,9 @@ public class RequestResponseTest {
} else if (version >= 2 && version <= 5) {
ListOffsetPartition partition = new ListOffsetPartition()
.setPartitionIndex(0)
.setTimestamp(1000000L);
if (version >= 4) {
partition.setCurrentLeaderEpoch(5);
}
.setTimestamp(1000000L)
.setCurrentLeaderEpoch(5);
ListOffsetTopic topic = new ListOffsetTopic()
.setName("test")
.setPartitions(Arrays.asList(partition));

View File

@ -143,7 +143,13 @@ class ListOffsetsRequestTest extends BaseRequestTest {
val partitionData = response.topics.asScala.find(_.name == topic).get
.partitions.asScala.find(_.partitionIndex == partition.partition).get
(partitionData.offset, partitionData.leaderEpoch)
if (version == 0) {
if (partitionData.oldStyleOffsets().isEmpty)
(-1, partitionData.leaderEpoch)
else
(partitionData.oldStyleOffsets().asScala.head, partitionData.leaderEpoch)
} else
(partitionData.offset, partitionData.leaderEpoch)
}
@Test
@ -174,17 +180,27 @@ class ListOffsetsRequestTest extends BaseRequestTest {
}
@Test
def testResponseDefaultOffsetAndLeaderEpochForLowerVersions(): Unit = {
def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
val firstLeaderId = partitionToLeader(partition.partition)
TestUtils.generateAndProduceMessages(servers, topic, 10)
assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 0))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 1))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 2))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 3))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 4))
for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion) {
if (version == 0) {
assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
} else if (version >= 1 && version <= 3) {
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
} else if (version >= 4) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
}
}
}
private def assertResponseError(error: Errors, brokerId: Int, request: ListOffsetRequest): Unit = {