MINOR: Add log segment unit tests, If the maximum offset beyond index, appen… (#13584)

Add log segment unit tests, If the maximum offset beyond index, appended to the log section, it throws LogSegmentOffsetOverflowException

*More detailed description of change
The unit tests for the log segment should cover more scenario validation, and the unit tests submitted this time cover LogSegmentOffsetOverflowException that are expected to be thrown when the maximum index is exceeded

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>
This commit is contained in:
蓝士钦 2023-06-01 17:06:12 +08:00 committed by GitHub
parent e33172924e
commit e1913d2e4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 28 additions and 0 deletions

View File

@ -16,6 +16,7 @@
*/
package kafka.log
import kafka.common.LogSegmentOffsetOverflowException
import kafka.utils.TestUtils
import kafka.utils.TestUtils.checkEquals
import org.apache.kafka.common.TopicPartition
@ -27,6 +28,8 @@ import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{BatchMetadata, EpochEntry, LogConfig, ProducerStateEntry, ProducerStateManager, ProducerStateManagerConfig, RollParams}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import java.io.File
import java.util
@ -65,6 +68,31 @@ class LogSegmentTest {
Utils.delete(logDir)
}
/**
* LogSegmentOffsetOverflowException should be thrown while appending the logs if:
* 1. largestOffset - baseOffset < 0
* 2. largestOffset - baseOffset > Integer.MAX_VALUE
*/
@ParameterizedTest
@CsvSource(Array(
"0, -2147483648",
"0, 2147483648",
"1, 0",
"100, 10",
"2147483648, 0",
"-2147483648, 0",
"2147483648,4294967296"
))
def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = {
val seg = createSegment(baseOffset)
val currentTime = Time.SYSTEM.milliseconds()
val shallowOffsetOfMaxTimestamp = largestOffset
val memoryRecords = records(0, "hello")
assertThrows(classOf[LogSegmentOffsetOverflowException], () => {
seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords)
})
}
/**
* A read on an empty log segment should return null
*/