KAFKA-806 Index may not always observe log.index.interval.bytes (#18843)

Currently, each log.append() will add at most 1 index entry, even when the appended data is larger than log.index.interval.bytes. One potential issue is that if a follower restarts after being down for a long time, it may fetch data much bigger than log.index.interval.bytes at a time. This means that fewer index entries are created, which can increase the fetch time from the consumers.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
PoAn Yang 2025-02-11 01:24:19 +08:00 committed by GitHub
parent c215b8b645
commit 27a3e5560e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 176 additions and 172 deletions

View File

@ -413,8 +413,8 @@ class LocalLog(@volatile private var _dir: File,
} }
} }
private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { private[log] def append(lastOffset: Long, records: MemoryRecords): Unit = {
segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records) segments.activeSegment.append(lastOffset, records)
updateLogEndOffset(lastOffset + 1) updateLogEndOffset(lastOffset + 1)
} }

View File

@ -816,7 +816,7 @@ private[log] class Cleaner(val id: Int,
val retained = MemoryRecords.readableRecords(outputBuffer) val retained = MemoryRecords.readableRecords(outputBuffer)
// it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
// after `Log.replaceSegments` (which acquires the lock) is called // after `Log.replaceSegments` (which acquires the lock) is called
dest.append(result.maxOffset, result.maxTimestamp, result.shallowOffsetOfMaxTimestamp(), retained) dest.append(result.maxOffset, retained)
throttler.maybeThrottle(outputBuffer.limit()) throttler.maybeThrottle(outputBuffer.limit())
} }

View File

@ -821,7 +821,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validRecords = validateAndOffsetAssignResult.validatedRecords validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs) appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp)
appendInfo.setLastOffset(offset.value - 1) appendInfo.setLastOffset(offset.value - 1)
appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats) appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
@ -907,7 +906,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// will be cleaned up after the log directory is recovered. Note that the end offset of the // will be cleaned up after the log directory is recovered. Note that the end offset of the
// ProducerStateManager will not be updated and the last stable offset will not advance // ProducerStateManager will not be updated and the last stable offset will not advance
// if the append to the transaction index fails. // if the append to the transaction index fails.
localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.shallowOffsetOfMaxTimestamp, validRecords) localLog.append(appendInfo.lastOffset, validRecords)
updateHighWatermarkWithLogEndOffset() updateHighWatermarkWithLogEndOffset()
// update the producer state // update the producer state
@ -1193,7 +1192,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
else else
OptionalInt.empty() OptionalInt.empty()
new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, shallowOffsetOfMaxTimestamp, new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp,
RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression,
validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE) validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE)
} }

View File

@ -98,8 +98,6 @@ class LocalLogTest {
log: LocalLog = log, log: LocalLog = log,
initialOffset: Long = 0L): Unit = { initialOffset: Long = 0L): Unit = {
log.append(lastOffset = initialOffset + records.size - 1, log.append(lastOffset = initialOffset + records.size - 1,
largestTimestamp = records.head.timestamp,
shallowOffsetOfMaxTimestamp = initialOffset,
records = MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, records.toList : _*)) records = MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, records.toList : _*))
} }

View File

@ -34,6 +34,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import java.io.{File, RandomAccessFile} import java.io.{File, RandomAccessFile}
import java.nio.ByteBuffer
import java.util.{Optional, OptionalLong} import java.util.{Optional, OptionalLong}
import scala.collection._ import scala.collection._
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -86,11 +87,9 @@ class LogSegmentTest {
)) ))
def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = { def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = {
val seg = createSegment(baseOffset) val seg = createSegment(baseOffset)
val currentTime = Time.SYSTEM.milliseconds()
val shallowOffsetOfMaxTimestamp = largestOffset
val memoryRecords = records(0, "hello") val memoryRecords = records(0, "hello")
assertThrows(classOf[LogSegmentOffsetOverflowException], () => { assertThrows(classOf[LogSegmentOffsetOverflowException], () => {
seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords) seg.append(largestOffset, memoryRecords)
}) })
} }
@ -112,7 +111,7 @@ class LogSegmentTest {
def testReadBeforeFirstOffset(): Unit = { def testReadBeforeFirstOffset(): Unit = {
val seg = createSegment(40) val seg = createSegment(40)
val ms = records(50, "hello", "there", "little", "bee") val ms = records(50, "hello", "there", "little", "bee")
seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms) seg.append(53, ms)
val read = seg.read(41, 300).records val read = seg.read(41, 300).records
checkEquals(ms.records.iterator, read.records.iterator) checkEquals(ms.records.iterator, read.records.iterator)
} }
@ -124,7 +123,7 @@ class LogSegmentTest {
def testReadAfterLast(): Unit = { def testReadAfterLast(): Unit = {
val seg = createSegment(40) val seg = createSegment(40)
val ms = records(50, "hello", "there") val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) seg.append(51, ms)
val read = seg.read(52, 200) val read = seg.read(52, 200)
assertNull(read, "Read beyond the last offset in the segment should give null") assertNull(read, "Read beyond the last offset in the segment should give null")
} }
@ -137,9 +136,9 @@ class LogSegmentTest {
def testReadFromGap(): Unit = { def testReadFromGap(): Unit = {
val seg = createSegment(40) val seg = createSegment(40)
val ms = records(50, "hello", "there") val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) seg.append(51, ms)
val ms2 = records(60, "alpha", "beta") val ms2 = records(60, "alpha", "beta")
seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2) seg.append(61, ms2)
val read = seg.read(55, 200) val read = seg.read(55, 200)
checkEquals(ms2.records.iterator, read.records.records.iterator) checkEquals(ms2.records.iterator, read.records.records.iterator)
} }
@ -151,7 +150,7 @@ class LogSegmentTest {
val maxSize = 1 val maxSize = 1
val seg = createSegment(40) val seg = createSegment(40)
val ms = records(50, "hello", "there") val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) seg.append(51, ms)
// read before first offset // read before first offset
var read = seg.read(48, maxSize, maxPosition, minOneMessage) var read = seg.read(48, maxSize, maxPosition, minOneMessage)
assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata) assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata)
@ -182,9 +181,9 @@ class LogSegmentTest {
var offset = 40 var offset = 40
for (_ <- 0 until 30) { for (_ <- 0 until 30) {
val ms1 = records(offset, "hello") val ms1 = records(offset, "hello")
seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1) seg.append(offset, ms1)
val ms2 = records(offset + 1, "hello") val ms2 = records(offset + 1, "hello")
seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2) seg.append(offset + 1, ms2)
// check that we can read back both messages // check that we can read back both messages
val read = seg.read(offset, 10000) val read = seg.read(offset, 10000)
assertEquals(List(ms1.records.iterator.next(), ms2.records.iterator.next()), read.records.records.asScala.toList) assertEquals(List(ms1.records.iterator.next(), ms2.records.iterator.next()), read.records.records.asScala.toList)
@ -243,7 +242,7 @@ class LogSegmentTest {
val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1) val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1)
var offset = 40 var offset = 40
for (_ <- 0 until numMessages) { for (_ <- 0 until numMessages) {
seg.append(offset, offset, offset, records(offset, "hello")) seg.append(offset, records(offset, "hello"))
offset += 1 offset += 1
} }
assertEquals(offset, seg.readNextOffset) assertEquals(offset, seg.readNextOffset)
@ -265,7 +264,17 @@ class LogSegmentTest {
// test the case where we fully truncate the log // test the case where we fully truncate the log
val time = new MockTime val time = new MockTime
val seg = createSegment(40, time = time) val seg = createSegment(40, time = time)
seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")) seg.append(
41,
MemoryRecords.withRecords(
RecordBatch.MAGIC_VALUE_V1,
40,
Compression.NONE,
TimestampType.CREATE_TIME,
new SimpleRecord("hello".getBytes()),
new SimpleRecord("there".getBytes())
)
)
// If the segment is empty after truncation, the create time should be reset // If the segment is empty after truncation, the create time should be reset
time.sleep(500) time.sleep(500)
@ -277,7 +286,7 @@ class LogSegmentTest {
assertFalse(seg.offsetIndex.isFull) assertFalse(seg.offsetIndex.isFull)
assertNull(seg.read(0, 1024), "Segment should be empty.") assertNull(seg.read(0, 1024), "Segment should be empty.")
seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")) seg.append(41, records(40, "hello", "there"))
} }
/** /**
@ -289,7 +298,7 @@ class LogSegmentTest {
val seg = createSegment(40, messageSize * 2 - 1) val seg = createSegment(40, messageSize * 2 - 1)
// Produce some messages // Produce some messages
for (i <- 40 until 50) for (i <- 40 until 50)
seg.append(i, i * 10, i, records(i, s"msg$i")) seg.append(i, records(i, s"msg$i"))
assertEquals(490, seg.largestTimestamp) assertEquals(490, seg.largestTimestamp)
// Search for an indexed timestamp // Search for an indexed timestamp
@ -313,7 +322,7 @@ class LogSegmentTest {
def testNextOffsetCalculation(): Unit = { def testNextOffsetCalculation(): Unit = {
val seg = createSegment(40) val seg = createSegment(40)
assertEquals(40, seg.readNextOffset) assertEquals(40, seg.readNextOffset)
seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you")) seg.append(52, records(50, "hello", "there", "you"))
assertEquals(53, seg.readNextOffset) assertEquals(53, seg.readNextOffset)
} }
@ -354,7 +363,7 @@ class LogSegmentTest {
def testRecoveryFixesCorruptIndex(): Unit = { def testRecoveryFixesCorruptIndex(): Unit = {
val seg = createSegment(0) val seg = createSegment(0)
for (i <- 0 until 100) for (i <- 0 until 100)
seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString)) seg.append(i, records(i, i.toString))
val indexFile = seg.offsetIndexFile val indexFile = seg.offsetIndexFile
writeNonsenseToFile(indexFile, 5, indexFile.length.toInt) writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
seg.recover(newProducerStateManager(), Optional.empty()) seg.recover(newProducerStateManager(), Optional.empty())
@ -375,25 +384,22 @@ class LogSegmentTest {
val pid2 = 10L val pid2 = 10L
// append transactional records from pid1 // append transactional records from pid1
segment.append(101L, RecordBatch.NO_TIMESTAMP, segment.append(101L, MemoryRecords.withTransactionalRecords(100L, Compression.NONE,
100L, MemoryRecords.withTransactionalRecords(100L, Compression.NONE,
pid1, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) pid1, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// append transactional records from pid2 // append transactional records from pid2
segment.append(103L, RecordBatch.NO_TIMESTAMP, 102L, MemoryRecords.withTransactionalRecords(102L, Compression.NONE, segment.append(103L, MemoryRecords.withTransactionalRecords(102L, Compression.NONE,
pid2, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) pid2, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// append non-transactional records // append non-transactional records
segment.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE, segment.append(105L, MemoryRecords.withRecords(104L, Compression.NONE,
partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// abort the transaction from pid2 (note LSO should be 100L since the txn from pid1 has not completed) // abort the transaction from pid2 (note LSO should be 100L since the txn from pid1 has not completed)
segment.append(106L, RecordBatch.NO_TIMESTAMP, 106L, segment.append(106L, endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 106L))
endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 106L))
// commit the transaction from pid1 // commit the transaction from pid1
segment.append(107L, RecordBatch.NO_TIMESTAMP, 107L, segment.append(107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
var stateManager = newProducerStateManager() var stateManager = newProducerStateManager()
segment.recover(stateManager, Optional.empty()) segment.recover(stateManager, Optional.empty())
@ -434,16 +440,16 @@ class LogSegmentTest {
val checkpoint: LeaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)) val checkpoint: LeaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1))
val cache = new LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new MockTime())) val cache = new LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new MockTime()))
seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE, 0, seg.append(105L, MemoryRecords.withRecords(104L, Compression.NONE, 0,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.append(107L, RecordBatch.NO_TIMESTAMP, 106L, MemoryRecords.withRecords(106L, Compression.NONE, 1, seg.append(107L, MemoryRecords.withRecords(106L, Compression.NONE, 1,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.append(109L, RecordBatch.NO_TIMESTAMP, 108L, MemoryRecords.withRecords(108L, Compression.NONE, 1, seg.append(109L, MemoryRecords.withRecords(108L, Compression.NONE, 1,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.append(111L, RecordBatch.NO_TIMESTAMP, 110, MemoryRecords.withRecords(110L, Compression.NONE, 2, seg.append(111L, MemoryRecords.withRecords(110L, Compression.NONE, 2,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.recover(newProducerStateManager(), Optional.of(cache)) seg.recover(newProducerStateManager(), Optional.of(cache))
@ -472,7 +478,7 @@ class LogSegmentTest {
def testRecoveryFixesCorruptTimeIndex(): Unit = { def testRecoveryFixesCorruptTimeIndex(): Unit = {
val seg = createSegment(0) val seg = createSegment(0)
for (i <- 0 until 100) for (i <- 0 until 100)
seg.append(i, i * 10, i, records(i, i.toString)) seg.append(i, records(i, i.toString))
val timeIndexFile = seg.timeIndexFile val timeIndexFile = seg.timeIndexFile
writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt) writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
seg.recover(newProducerStateManager(), Optional.empty()) seg.recover(newProducerStateManager(), Optional.empty())
@ -492,7 +498,7 @@ class LogSegmentTest {
for (_ <- 0 until 10) { for (_ <- 0 until 10) {
val seg = createSegment(0) val seg = createSegment(0)
for (i <- 0 until messagesAppended) for (i <- 0 until messagesAppended)
seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString)) seg.append(i, records(i, i.toString))
val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended) val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
// start corrupting somewhere in the middle of the chosen record all the way to the end // start corrupting somewhere in the middle of the chosen record all the way to the end
@ -523,9 +529,9 @@ class LogSegmentTest {
def testCreateWithInitFileSizeAppendMessage(): Unit = { def testCreateWithInitFileSizeAppendMessage(): Unit = {
val seg = createSegment(40, fileAlreadyExists = false, 512*1024*1024, preallocate = true) val seg = createSegment(40, fileAlreadyExists = false, 512*1024*1024, preallocate = true)
val ms = records(50, "hello", "there") val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) seg.append(51, ms)
val ms2 = records(60, "alpha", "beta") val ms2 = records(60, "alpha", "beta")
seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2) seg.append(61, ms2)
val read = seg.read(55, 200) val read = seg.read(55, 200)
checkEquals(ms2.records.iterator, read.records.records.iterator) checkEquals(ms2.records.iterator, read.records.records.iterator)
} }
@ -544,9 +550,9 @@ class LogSegmentTest {
512 * 1024 * 1024, true) 512 * 1024 * 1024, true)
val ms = records(50, "hello", "there") val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) seg.append(51, ms)
val ms2 = records(60, "alpha", "beta") val ms2 = records(60, "alpha", "beta")
seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2) seg.append(61, ms2)
val read = seg.read(55, 200) val read = seg.read(55, 200)
checkEquals(ms2.records.iterator, read.records.records.iterator) checkEquals(ms2.records.iterator, read.records.records.iterator)
val oldSize = seg.log.sizeInBytes() val oldSize = seg.log.sizeInBytes()
@ -581,9 +587,9 @@ class LogSegmentTest {
//Given two messages with a gap between them (e.g. mid offset compacted away) //Given two messages with a gap between them (e.g. mid offset compacted away)
val ms1 = records(offset, "first message") val ms1 = records(offset, "first message")
seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1) seg.append(offset, ms1)
val ms2 = records(offset + 3, "message after gap") val ms2 = records(offset + 3, "message after gap")
seg.append(offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2) seg.append(offset + 3, ms2)
// When we truncate to an offset without a corresponding log entry // When we truncate to an offset without a corresponding log entry
seg.truncateTo(offset + 1) seg.truncateTo(offset + 1)
@ -629,12 +635,80 @@ class LogSegmentTest {
val segment = createSegment(1) val segment = createSegment(1)
assertEquals(Long.MaxValue, segment.getFirstBatchTimestamp) assertEquals(Long.MaxValue, segment.getFirstBatchTimestamp)
segment.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord("one".getBytes))) segment.append(1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord(1000L, "one".getBytes)))
assertEquals(1000L, segment.getFirstBatchTimestamp) assertEquals(1000L, segment.getFirstBatchTimestamp)
segment.close() segment.close()
} }
@Test
def testIndexForMultipleBatchesInMemoryRecords(): Unit = {
val segment = createSegment(0, 1, Time.SYSTEM)
val buffer1 = ByteBuffer.allocate(1024)
// append first batch to buffer1
var builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 0)
builder.append(0L, "key1".getBytes, "value1".getBytes)
builder.close()
// append second batch to buffer1
builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 1)
builder.append(1L, "key1".getBytes, "value1".getBytes)
builder.close()
buffer1.flip
var record = MemoryRecords.readableRecords(buffer1)
segment.append(1L, record)
val buffer2 = ByteBuffer.allocate(1024)
// append first batch to buffer2
builder = MemoryRecords.builder(buffer2, Compression.NONE, TimestampType.CREATE_TIME, 2)
builder.append(2L, "key1".getBytes, "value1".getBytes)
builder.close()
buffer2.flip
record = MemoryRecords.readableRecords(buffer2)
segment.append(2L, record)
assertEquals(2, segment.offsetIndex.entries)
assertEquals(1, segment.offsetIndex.entry(0).offset)
assertEquals(2, segment.offsetIndex.entry(1).offset)
assertEquals(2, segment.timeIndex.entries)
assertEquals(new TimestampOffset(1, 1), segment.timeIndex.entry(0))
assertEquals(new TimestampOffset(2, 2), segment.timeIndex.entry(1))
}
@Test
def estNonMonotonicTimestampForMultipleBatchesInMemoryRecords(): Unit = {
val segment = createSegment(0, 1, Time.SYSTEM)
val buffer1 = ByteBuffer.allocate(1024)
// append first batch to buffer1
var builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 0)
builder.append(1L, "key1".getBytes, "value1".getBytes)
builder.close()
// append second batch to buffer1
builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 1)
builder.append(0L, "key1".getBytes, "value1".getBytes)
builder.close()
// append third batch to buffer1
builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 2)
builder.append(2L, "key1".getBytes, "value1".getBytes)
builder.close()
buffer1.flip()
val record = MemoryRecords.readableRecords(buffer1)
segment.append(2L, record)
assertEquals(2, segment.offsetIndex.entries)
assertEquals(1, segment.offsetIndex.entry(0).offset)
assertEquals(2, segment.offsetIndex.entry(1).offset)
assertEquals(2, segment.timeIndex.entries)
assertEquals(new TimestampOffset(1, 0), segment.timeIndex.entry(0))
assertEquals(new TimestampOffset(2, 2), segment.timeIndex.entry(1))
}
private def newProducerStateManager(): ProducerStateManager = { private def newProducerStateManager(): ProducerStateManager = {
new ProducerStateManager( new ProducerStateManager(
topicPartition, topicPartition,

View File

@ -187,12 +187,6 @@ class LogValidatorTest {
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
// If it's LOG_APPEND_TIME, the offset will be the offset of the first record // If it's LOG_APPEND_TIME, the offset will be the offset of the first record
val expectedMaxTimestampOffset = magic match {
case RecordBatch.MAGIC_VALUE_V0 => -1
case RecordBatch.MAGIC_VALUE_V1 => 0
case _ => 2
}
assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp)
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records,
compressed = false) compressed = false)
} }
@ -236,8 +230,6 @@ class LogValidatorTest {
"MessageSet should still valid") "MessageSet should still valid")
assertEquals(now, validatedResults.maxTimestampMs, assertEquals(now, validatedResults.maxTimestampMs,
s"Max timestamp should be $now") s"Max timestamp should be $now")
assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp,
s"The shallow offset of max timestamp should be 2 if logAppendTime is used")
assertTrue(validatedResults.messageSizeMaybeChanged, assertTrue(validatedResults.messageSizeMaybeChanged,
"Message size may have been changed") "Message size may have been changed")
@ -289,8 +281,6 @@ class LogValidatorTest {
"MessageSet should still valid") "MessageSet should still valid")
assertEquals(now, validatedResults.maxTimestampMs, assertEquals(now, validatedResults.maxTimestampMs,
s"Max timestamp should be $now") s"Max timestamp should be $now")
assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp,
s"The shallow offset of max timestamp should be the last offset 2 if logAppendTime is used")
assertFalse(validatedResults.messageSizeMaybeChanged, assertFalse(validatedResults.messageSizeMaybeChanged,
"Message size should not have been changed") "Message size should not have been changed")
@ -426,10 +416,8 @@ class LogValidatorTest {
// V1: 3 batches are in the records, so the shallow OffsetOfMaxTimestamp is the timestamp of batch-1 // V1: 3 batches are in the records, so the shallow OffsetOfMaxTimestamp is the timestamp of batch-1
if (magic >= RecordBatch.MAGIC_VALUE_V2) { if (magic >= RecordBatch.MAGIC_VALUE_V2) {
assertEquals(1, records.batches().asScala.size) assertEquals(1, records.batches().asScala.size)
assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)
} else { } else {
assertEquals(3, records.batches().asScala.size) assertEquals(3, records.batches().asScala.size)
assertEquals(1, validatingResults.shallowOffsetOfMaxTimestamp)
} }
assertFalse(validatingResults.messageSizeMaybeChanged, assertFalse(validatingResults.messageSizeMaybeChanged,
@ -511,7 +499,6 @@ class LogValidatorTest {
// Both V2 and V1 has single batch in the validated records when compression is enable, and hence their shallow // Both V2 and V1 has single batch in the validated records when compression is enable, and hence their shallow
// OffsetOfMaxTimestamp is the last offset of the single batch // OffsetOfMaxTimestamp is the last offset of the single batch
assertEquals(1, validatedRecords.batches().asScala.size) assertEquals(1, validatedRecords.batches().asScala.size)
assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)
assertTrue(validatingResults.messageSizeMaybeChanged, assertTrue(validatingResults.messageSizeMaybeChanged,
"Message size should have been changed") "Message size should have been changed")
@ -563,7 +550,6 @@ class LogValidatorTest {
} }
assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP, assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP,
s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}") s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}")
assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp)
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records,
@ -610,8 +596,6 @@ class LogValidatorTest {
assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence) assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence)
} }
assertEquals(timestamp, validatedResults.maxTimestampMs) assertEquals(timestamp, validatedResults.maxTimestampMs)
assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp,
s"Offset of max timestamp should be the last offset 2.")
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records,
@ -682,9 +666,6 @@ class LogValidatorTest {
} }
assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}")
val expectedShallowOffsetOfMaxTimestamp = 2
assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestamp,
s"Shallow offset of max timestamp should be 2")
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records,

View File

@ -4393,8 +4393,8 @@ class UnifiedLogTest {
segments.add(seg2) segments.add(seg2)
assertEquals(Seq(Long.MaxValue, Long.MaxValue), log.getFirstBatchTimestampForSegments(segments).asScala.toSeq) assertEquals(Seq(Long.MaxValue, Long.MaxValue), log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
seg1.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord("one".getBytes))) seg1.append(1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord(1000L, "one".getBytes)))
seg2.append(2, 2000L, 1, MemoryRecords.withRecords(2, Compression.NONE, new SimpleRecord("two".getBytes))) seg2.append(2, MemoryRecords.withRecords(2, Compression.NONE, new SimpleRecord(2000L, "two".getBytes)))
assertEquals(Seq(1000L, 2000L), log.getFirstBatchTimestampForSegments(segments).asScala.toSeq) assertEquals(Seq(1000L, 2000L), log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
seg1.close() seg1.close()

View File

@ -106,7 +106,6 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
lastOffset, lastOffset,
lastEpoch, lastEpoch,
maxTimestamp, maxTimestamp,
shallowOffsetOfMaxTimestamp,
Time.SYSTEM.milliseconds(), Time.SYSTEM.milliseconds(),
state.logStartOffset, state.logStartOffset,
RecordValidationStats.EMPTY, RecordValidationStats.EMPTY,

View File

@ -778,7 +778,6 @@ class ReplicaFetcherThreadTest {
0, 0,
OptionalInt.empty, OptionalInt.empty,
RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP,
-1L,
RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP,
-1L, -1L,
RecordValidationStats.EMPTY, RecordValidationStats.EMPTY,

View File

@ -36,7 +36,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord} import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache} import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, Record, RecordBatch, RecordVersion, SimpleRecord} import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, Record, RecordVersion, SimpleRecord}
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.{CoordinatorRecord, CoordinatorRecordSerde} import org.apache.kafka.coordinator.group.{CoordinatorRecord, CoordinatorRecordSerde}
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue} import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue}
@ -394,8 +394,8 @@ class DumpLogSegmentsTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024) val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats, time.scheduler, time) log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats, time.scheduler, time)
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*), leaderEpoch = 0) log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*), leaderEpoch = 0)
val secondSegment = log.roll(); val secondSegment = log.roll()
secondSegment.append(1L, RecordBatch.NO_TIMESTAMP, 1L, MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*)) secondSegment.append(1L, MemoryRecords.withRecords(Compression.NONE, metadataRecords: _*))
secondSegment.flush() secondSegment.flush()
log.flush(true) log.flush(true)

View File

@ -31,13 +31,12 @@ import java.util.OptionalInt;
public class LogAppendInfo { public class LogAppendInfo {
public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new LogAppendInfo(-1, -1, OptionalInt.empty(), public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, -1L,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L); RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);
private long firstOffset; private long firstOffset;
private long lastOffset; private long lastOffset;
private long maxTimestamp; private long maxTimestamp;
private long shallowOffsetOfMaxTimestamp;
private long logAppendTime; private long logAppendTime;
private long logStartOffset; private long logStartOffset;
private RecordValidationStats recordValidationStats; private RecordValidationStats recordValidationStats;
@ -52,31 +51,29 @@ public class LogAppendInfo {
/** /**
* Creates an instance with the given params. * Creates an instance with the given params.
* *
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower. * to the follower.
* @param lastOffset The last offset in the message set * @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set. * @param maxTimestamp The maximum timestamp of the message set.
* @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp. * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp * @param logStartOffset The start offset of the log at the time of this append.
* @param logStartOffset The start offset of the log at the time of this append. * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` * @param sourceCompression The source codec used in the message set (send by the producer)
* @param sourceCompression The source codec used in the message set (send by the producer) * @param validBytes The number of valid bytes
* @param validBytes The number of valid bytes * @param lastOffsetOfFirstBatch The last offset of the first batch
* @param lastOffsetOfFirstBatch The last offset of the first batch
*/ */
public LogAppendInfo(long firstOffset, public LogAppendInfo(long firstOffset,
long lastOffset, long lastOffset,
OptionalInt lastLeaderEpoch, OptionalInt lastLeaderEpoch,
long maxTimestamp, long maxTimestamp,
long shallowOffsetOfMaxTimestamp,
long logAppendTime, long logAppendTime,
long logStartOffset, long logStartOffset,
RecordValidationStats recordValidationStats, RecordValidationStats recordValidationStats,
CompressionType sourceCompression, CompressionType sourceCompression,
int validBytes, int validBytes,
long lastOffsetOfFirstBatch) { long lastOffsetOfFirstBatch) {
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset, this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, logAppendTime, logStartOffset,
recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(), recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(),
LeaderHwChange.NONE); LeaderHwChange.NONE);
} }
@ -84,27 +81,25 @@ public class LogAppendInfo {
/** /**
* Creates an instance with the given params. * Creates an instance with the given params.
* *
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower. * to the follower.
* @param lastOffset The last offset in the message set * @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set. * @param maxTimestamp The maximum timestamp of the message set.
* @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp. * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp * @param logStartOffset The start offset of the log at the time of this append.
* @param logStartOffset The start offset of the log at the time of this append. * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` * @param sourceCompression The source codec used in the message set (send by the producer)
* @param sourceCompression The source codec used in the message set (send by the producer) * @param validBytes The number of valid bytes
* @param validBytes The number of valid bytes * @param lastOffsetOfFirstBatch The last offset of the first batch
* @param lastOffsetOfFirstBatch The last offset of the first batch * @param recordErrors List of record errors that caused the respective batch to be dropped
* @param recordErrors List of record errors that caused the respective batch to be dropped * @param leaderHwChange Incremental if the high watermark needs to be increased after appending record
* @param leaderHwChange Incremental if the high watermark needs to be increased after appending record * Same if high watermark is not changed. None is the default value and it means append failed
* Same if high watermark is not changed. None is the default value and it means append failed
*/ */
public LogAppendInfo(long firstOffset, public LogAppendInfo(long firstOffset,
long lastOffset, long lastOffset,
OptionalInt lastLeaderEpoch, OptionalInt lastLeaderEpoch,
long maxTimestamp, long maxTimestamp,
long shallowOffsetOfMaxTimestamp,
long logAppendTime, long logAppendTime,
long logStartOffset, long logStartOffset,
RecordValidationStats recordValidationStats, RecordValidationStats recordValidationStats,
@ -117,7 +112,6 @@ public class LogAppendInfo {
this.lastOffset = lastOffset; this.lastOffset = lastOffset;
this.lastLeaderEpoch = lastLeaderEpoch; this.lastLeaderEpoch = lastLeaderEpoch;
this.maxTimestamp = maxTimestamp; this.maxTimestamp = maxTimestamp;
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
this.logAppendTime = logAppendTime; this.logAppendTime = logAppendTime;
this.logStartOffset = logStartOffset; this.logStartOffset = logStartOffset;
this.recordValidationStats = recordValidationStats; this.recordValidationStats = recordValidationStats;
@ -156,14 +150,6 @@ public class LogAppendInfo {
this.maxTimestamp = maxTimestamp; this.maxTimestamp = maxTimestamp;
} }
public long shallowOffsetOfMaxTimestamp() {
return shallowOffsetOfMaxTimestamp;
}
public void setShallowOffsetOfMaxTimestamp(long shallowOffsetOfMaxTimestamp) {
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
}
public long logAppendTime() { public long logAppendTime() {
return logAppendTime; return logAppendTime;
} }
@ -233,12 +219,12 @@ public class LogAppendInfo {
* @return a new instance with the given LeaderHwChange * @return a new instance with the given LeaderHwChange
*/ */
public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) { public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) {
return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats, return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, logAppendTime, logStartOffset, recordValidationStats,
sourceCompression, validBytes, lastOffsetOfFirstBatch, recordErrors, newLeaderHwChange); sourceCompression, validBytes, lastOffsetOfFirstBatch, recordErrors, newLeaderHwChange);
} }
public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long logStartOffset) { public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long logStartOffset) {
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L); RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);
} }
@ -248,7 +234,7 @@ public class LogAppendInfo {
* in unknownLogAppendInfoWithLogStartOffset, but with additional fields recordErrors * in unknownLogAppendInfoWithLogStartOffset, but with additional fields recordErrors
*/ */
public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long logStartOffset, List<RecordError> recordErrors) { public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long logStartOffset, List<RecordError> recordErrors) {
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L, recordErrors, LeaderHwChange.NONE); RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L, recordErrors, LeaderHwChange.NONE);
} }
@ -259,7 +245,6 @@ public class LogAppendInfo {
", lastOffset=" + lastOffset + ", lastOffset=" + lastOffset +
", lastLeaderEpoch=" + lastLeaderEpoch + ", lastLeaderEpoch=" + lastLeaderEpoch +
", maxTimestamp=" + maxTimestamp + ", maxTimestamp=" + maxTimestamp +
", shallowOffsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp +
", logAppendTime=" + logAppendTime + ", logAppendTime=" + logAppendTime +
", logStartOffset=" + logStartOffset + ", logStartOffset=" + logStartOffset +
", recordConversionStats=" + recordValidationStats + ", recordConversionStats=" + recordValidationStats +

View File

@ -239,38 +239,38 @@ public class LogSegment implements Closeable {
* It is assumed this method is being called from within a lock, it is not thread-safe otherwise. * It is assumed this method is being called from within a lock, it is not thread-safe otherwise.
* *
* @param largestOffset The last offset in the message set * @param largestOffset The last offset in the message set
* @param largestTimestampMs The largest timestamp in the message set. * @param records The log entries to append.
* @param shallowOffsetOfMaxTimestamp The last offset of earliest batch with max timestamp in the messages to append.
* @param records The log entries to append.
* @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
*/ */
public void append(long largestOffset, public void append(long largestOffset,
long largestTimestampMs,
long shallowOffsetOfMaxTimestamp,
MemoryRecords records) throws IOException { MemoryRecords records) throws IOException {
if (records.sizeInBytes() > 0) { if (records.sizeInBytes() > 0) {
LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}", LOGGER.trace("Inserting {} bytes at end offset {} at position {}",
records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp); records.sizeInBytes(), largestOffset, log.sizeInBytes());
int physicalPosition = log.sizeInBytes(); int physicalPosition = log.sizeInBytes();
if (physicalPosition == 0)
rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);
ensureOffsetInRange(largestOffset); ensureOffsetInRange(largestOffset);
// append the messages // append the messages
long appendedBytes = log.append(records); long appendedBytes = log.append(records);
LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset); LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset);
// Update the in memory max timestamp and corresponding offset.
if (largestTimestampMs > maxTimestampSoFar()) { for (RecordBatch batch : records.batches()) {
maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp); long batchMaxTimestamp = batch.maxTimestamp();
long batchLastOffset = batch.lastOffset();
if (batchMaxTimestamp > maxTimestampSoFar()) {
maxTimestampAndOffsetSoFar = new TimestampOffset(batchMaxTimestamp, batchLastOffset);
}
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex().append(batchLastOffset, physicalPosition);
timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());
bytesSinceLastIndexEntry = 0;
}
int sizeInBytes = batch.sizeInBytes();
physicalPosition += sizeInBytes;
bytesSinceLastIndexEntry += sizeInBytes;
} }
// append an entry to the index (if needed)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex().append(largestOffset, physicalPosition);
timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());
bytesSinceLastIndexEntry = 0;
}
bytesSinceLastIndexEntry += records.sizeInBytes();
} }
} }
@ -281,8 +281,6 @@ public class LogSegment implements Closeable {
private int appendChunkFromFile(FileRecords records, int position, BufferSupplier bufferSupplier) throws IOException { private int appendChunkFromFile(FileRecords records, int position, BufferSupplier bufferSupplier) throws IOException {
int bytesToAppend = 0; int bytesToAppend = 0;
long maxTimestamp = Long.MIN_VALUE;
long shallowOffsetOfMaxTimestamp = Long.MIN_VALUE;
long maxOffset = Long.MIN_VALUE; long maxOffset = Long.MIN_VALUE;
ByteBuffer readBuffer = bufferSupplier.get(1024 * 1024); ByteBuffer readBuffer = bufferSupplier.get(1024 * 1024);
@ -291,10 +289,6 @@ public class LogSegment implements Closeable {
Iterator<FileChannelRecordBatch> nextBatches = records.batchesFrom(position).iterator(); Iterator<FileChannelRecordBatch> nextBatches = records.batchesFrom(position).iterator();
FileChannelRecordBatch batch; FileChannelRecordBatch batch;
while ((batch = nextAppendableBatch(nextBatches, readBuffer, bytesToAppend)) != null) { while ((batch = nextAppendableBatch(nextBatches, readBuffer, bytesToAppend)) != null) {
if (batch.maxTimestamp() > maxTimestamp) {
maxTimestamp = batch.maxTimestamp();
shallowOffsetOfMaxTimestamp = batch.lastOffset();
}
maxOffset = batch.lastOffset(); maxOffset = batch.lastOffset();
bytesToAppend += batch.sizeInBytes(); bytesToAppend += batch.sizeInBytes();
} }
@ -307,7 +301,7 @@ public class LogSegment implements Closeable {
readBuffer.limit(bytesToAppend); readBuffer.limit(bytesToAppend);
records.readInto(readBuffer, position); records.readInto(readBuffer, position);
append(maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp, MemoryRecords.readableRecords(readBuffer)); append(maxOffset, MemoryRecords.readableRecords(readBuffer));
} }
bufferSupplier.release(readBuffer); bufferSupplier.release(readBuffer);

View File

@ -69,20 +69,15 @@ public class LogValidator {
public final long logAppendTimeMs; public final long logAppendTimeMs;
public final MemoryRecords validatedRecords; public final MemoryRecords validatedRecords;
public final long maxTimestampMs; public final long maxTimestampMs;
// we only maintain batch level offset for max timestamp since we want to align the behavior of updating time
// indexing entries. The paths of follower append and replica recovery do not iterate all records, so they have no
// idea about record level offset for max timestamp.
public final long shallowOffsetOfMaxTimestamp;
public final boolean messageSizeMaybeChanged; public final boolean messageSizeMaybeChanged;
public final RecordValidationStats recordValidationStats; public final RecordValidationStats recordValidationStats;
public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs, public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs,
long shallowOffsetOfMaxTimestamp, boolean messageSizeMaybeChanged, boolean messageSizeMaybeChanged,
RecordValidationStats recordValidationStats) { RecordValidationStats recordValidationStats) {
this.logAppendTimeMs = logAppendTimeMs; this.logAppendTimeMs = logAppendTimeMs;
this.validatedRecords = validatedRecords; this.validatedRecords = validatedRecords;
this.maxTimestampMs = maxTimestampMs; this.maxTimestampMs = maxTimestampMs;
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
this.messageSizeMaybeChanged = messageSizeMaybeChanged; this.messageSizeMaybeChanged = messageSizeMaybeChanged;
this.recordValidationStats = recordValidationStats; this.recordValidationStats = recordValidationStats;
} }
@ -236,7 +231,6 @@ public class LogValidator {
now, now,
convertedRecords, convertedRecords,
info.maxTimestamp, info.maxTimestamp,
info.shallowOffsetOfMaxTimestamp,
true, true,
recordValidationStats); recordValidationStats);
} }
@ -246,8 +240,6 @@ public class LogValidator {
MetricsRecorder metricsRecorder) { MetricsRecorder metricsRecorder) {
long now = time.milliseconds(); long now = time.milliseconds();
long maxTimestamp = RecordBatch.NO_TIMESTAMP; long maxTimestamp = RecordBatch.NO_TIMESTAMP;
long shallowOffsetOfMaxTimestamp = -1L;
long initialOffset = offsetCounter.value;
RecordBatch firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, CompressionType.NONE); RecordBatch firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, CompressionType.NONE);
@ -276,7 +268,6 @@ public class LogValidator {
if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) { if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) {
maxTimestamp = maxBatchTimestamp; maxTimestamp = maxBatchTimestamp;
shallowOffsetOfMaxTimestamp = offsetCounter.value - 1;
} }
batch.setLastOffset(offsetCounter.value - 1); batch.setLastOffset(offsetCounter.value - 1);
@ -293,23 +284,10 @@ public class LogValidator {
} }
if (timestampType == TimestampType.LOG_APPEND_TIME) { if (timestampType == TimestampType.LOG_APPEND_TIME) {
maxTimestamp = now; if (toMagic == RecordBatch.MAGIC_VALUE_V0) {
// those checks should be equal to MemoryRecordsBuilder#info maxTimestamp = RecordBatch.NO_TIMESTAMP;
switch (toMagic) { } else {
case RecordBatch.MAGIC_VALUE_V0: maxTimestamp = now;
maxTimestamp = RecordBatch.NO_TIMESTAMP;
// value will be the default value: -1
shallowOffsetOfMaxTimestamp = -1;
break;
case RecordBatch.MAGIC_VALUE_V1:
// Those single-record batches have same max timestamp, so the initial offset is equal with
// the last offset of earliest batch
shallowOffsetOfMaxTimestamp = initialOffset;
break;
default:
// there is only one batch so use the last offset
shallowOffsetOfMaxTimestamp = offsetCounter.value - 1;
break;
} }
} }
@ -317,7 +295,6 @@ public class LogValidator {
now, now,
records, records,
maxTimestamp, maxTimestamp,
shallowOffsetOfMaxTimestamp,
false, false,
RecordValidationStats.EMPTY); RecordValidationStats.EMPTY);
} }
@ -445,7 +422,6 @@ public class LogValidator {
now, now,
records, records,
maxTimestamp, maxTimestamp,
lastOffset,
false, false,
recordValidationStats); recordValidationStats);
} }
@ -487,7 +463,6 @@ public class LogValidator {
logAppendTime, logAppendTime,
records, records,
info.maxTimestamp, info.maxTimestamp,
info.shallowOffsetOfMaxTimestamp,
true, true,
recordValidationStats); recordValidationStats);
} }