MINOR: Remove unnecessary test conditions where ListOffsetsRequest version is 0 (#19738)

Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Yunchi Pang 2025-05-18 09:20:24 -07:00 committed by GitHub
parent f26974b16d
commit 6596ba3a78
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 29 additions and 34 deletions

View File

@ -21,10 +21,12 @@ import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartit
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import java.util.{Optional, Properties}
import scala.collection.Seq
@ -205,46 +207,39 @@ class ListOffsetsRequestTest extends BaseRequestTest {
assertEquals((9L, firstLeaderEpoch, Errors.NONE.code), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1))
}
@Test
def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.LIST_OFFSETS)
def testResponseDefaultOffsetAndLeaderEpochForAllVersions(version: Short): Unit = {
val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 3)
val firstLeaderId = partitionToLeader(partition.partition)
TestUtils.generateAndProduceMessages(brokers, topic, 9)
TestUtils.produceMessage(brokers, topic, "test-10", System.currentTimeMillis() + 10L)
for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion) {
if (version == 0) {
assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
} else if (version >= 1 && version <= 3) {
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
} else if (version >= 4 && version <= 6) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
} else if (version == 7) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
} else if (version >= 8) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
}
if (version >= 1 && version <= 3) {
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version))
assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version))
} else if (version >= 4 && version <= 6) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version))
} else if (version == 7) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version))
assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version))
assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version))
} else if (version >= 8) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version))
assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version))
}
}