KAFKA-2024 Log compaction can generate unindexable segments.

This commit is contained in:
Rajini Sivaram 2015-04-04 15:56:17 -07:00 committed by Jay Kreps
parent a686a67f60
commit 7acfa92c09
2 changed files with 48 additions and 1 deletions

View File

@ -483,7 +483,8 @@ private[log] class Cleaner(val id: Int,
segs = segs.tail
while(!segs.isEmpty &&
logSize + segs.head.size < maxSize &&
indexSize + segs.head.index.sizeInBytes < maxIndexSize) {
indexSize + segs.head.index.sizeInBytes < maxIndexSize &&
segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
group = segs.head :: group
logSize += segs.head.size
indexSize += segs.head.index.sizeInBytes

View File

@ -26,6 +26,7 @@ import scala.collection._
import kafka.common._
import kafka.utils._
import kafka.message._
import java.util.concurrent.atomic.AtomicLong
/**
* Unit tests for the log cleaning logic
@ -197,6 +198,51 @@ class CleanerTest extends JUnitSuite {
assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
}
/**
* Validate the logic for grouping log segments together for cleaning when only a small number of
* messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not
* contain a range of offsets greater than Int.MaxValue to ensure that relative offsets can be
* stored in 4 bytes.
*/
@Test
def testSegmentGroupingWithSparseOffsets() {
val cleaner = makeCleaner(Int.MaxValue)
val log = makeLog(config = logConfig.copy(segmentSize = 1024, indexInterval = 1))
// fill up first segment
while (log.numberOfSegments == 1)
log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
// forward offset and append message to next segment at offset Int.MaxValue
val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(Int.MaxValue-1), new Message("hello".getBytes, "hello".getBytes))
log.append(messageSet, assignOffsets = false)
log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
// grouping should result in a single group with maximum relative offset of Int.MaxValue
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
assertEquals(1, groups.size)
// append another message, making last offset of second segment > Int.MaxValue
log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
// grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
assertEquals(2, groups.size)
checkSegmentOrder(groups)
// append more messages, creating new segments, further grouping should still occur
while (log.numberOfSegments < 4)
log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = "hello".getBytes))
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue)
assertEquals(log.numberOfSegments-1, groups.size)
for (group <- groups)
assertTrue("Relative offset greater than Int.MaxValue", group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue)
checkSegmentOrder(groups)
}
private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]) {
val offsets = groups.flatMap(_.map(_.baseOffset))
assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets)