KAFKA-5584; Fix integer overflow in Log.size

It may lead to wrong metrics and it may break
size-based retention.

Author: Gregor Uhlenheuer <kongo2002@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3521 from kongo2002/KAFKA-5584
This commit is contained in:
Gregor Uhlenheuer 2017-07-12 16:56:09 -07:00 committed by Ismael Juma
parent 0547a08254
commit 2d2e9adb5d
2 changed files with 23 additions and 1 deletions

View File

@ -1144,7 +1144,7 @@ class Log(@volatile var dir: File,
/**
* The size of the log in bytes
*/
def size: Long = logSegments.map(_.size).sum
def size: Long = Log.sizeInBytes(logSegments)
/**
* The offset metadata of the next message that will be appended to the log
@ -1647,6 +1647,15 @@ object Log {
def offsetFromFilename(filename: String): Long =
filename.substring(0, filename.indexOf('.')).toLong
/**
* Calculate a log's size (in bytes) based on its log segments
*
* @param segments The log segments to calculate the size of
* @return Sum of the log segments' sizes (in bytes)
*/
def sizeInBytes(segments: Iterable[LogSegment]): Long =
segments.map(_.size.toLong).sum
/**
* Parse the topic and partition out of the directory name of a log
*/

View File

@ -198,6 +198,19 @@ class LogTest {
}
}
@Test
def testSizeForLargeLogs(): Unit = {
val largeSize = Int.MaxValue.toLong * 2
val logSegment = EasyMock.createMock(classOf[LogSegment])
EasyMock.expect(logSegment.size).andReturn(Int.MaxValue).anyTimes
EasyMock.replay(logSegment)
assertEquals(Int.MaxValue, Log.sizeInBytes(Seq(logSegment)))
assertEquals(largeSize, Log.sizeInBytes(Seq(logSegment, logSegment)))
assertTrue(Log.sizeInBytes(Seq(logSegment, logSegment)) > Int.MaxValue)
}
@Test
def testPidMapOffsetUpdatedForNonIdempotentData() {
val log = createLog(2048)