KAFKA-806 Index may not always observe log.index.interval.bytes (#18843)

Currently, each log.append() will add at most 1 index entry, even when the appended data is larger than log.index.interval.bytes. One potential issue is that if a follower restarts after being down for a long time, it may fetch data much bigger than log.index.interval.bytes at a time. This means that fewer index entries are created, which can increase the fetch time from the consumers.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
PoAn Yang 2025-02-11 01:24:19 +08:00 committed by Chia-Ping Tsai
parent 44cf3e60a1
commit f9eb066edb
12 changed files with 173 additions and 169 deletions

View File

@ -413,8 +413,8 @@ class LocalLog(@volatile private var _dir: File,
}
}
private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)
private[log] def append(lastOffset: Long, records: MemoryRecords): Unit = {
segments.activeSegment.append(lastOffset, records)
updateLogEndOffset(lastOffset + 1)
}

View File

@ -809,7 +809,7 @@ private[log] class Cleaner(val id: Int,
val retained = MemoryRecords.readableRecords(outputBuffer)
// it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
// after `Log.replaceSegments` (which acquires the lock) is called
dest.append(result.maxOffset, result.maxTimestamp, result.shallowOffsetOfMaxTimestamp(), retained)
dest.append(result.maxOffset, retained)
throttler.maybeThrottle(outputBuffer.limit())
}

View File

@ -818,7 +818,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp)
appendInfo.setLastOffset(offset.value - 1)
appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
@ -904,7 +903,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// will be cleaned up after the log directory is recovered. Note that the end offset of the
// ProducerStateManager will not be updated and the last stable offset will not advance
// if the append to the transaction index fails.
localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.shallowOffsetOfMaxTimestamp, validRecords)
localLog.append(appendInfo.lastOffset, validRecords)
updateHighWatermarkWithLogEndOffset()
// update the producer state
@ -1190,7 +1189,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
else
OptionalInt.empty()
new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, shallowOffsetOfMaxTimestamp,
new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp,
RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression,
validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE)
}

View File

@ -98,8 +98,6 @@ class LocalLogTest {
log: LocalLog = log,
initialOffset: Long = 0L): Unit = {
log.append(lastOffset = initialOffset + records.size - 1,
largestTimestamp = records.head.timestamp,
shallowOffsetOfMaxTimestamp = initialOffset,
records = MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, records.toList : _*))
}

View File

@ -34,6 +34,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import java.io.{File, RandomAccessFile}
import java.nio.ByteBuffer
import java.util.{Optional, OptionalLong}
import scala.collection._
import scala.jdk.CollectionConverters._
@ -86,11 +87,9 @@ class LogSegmentTest {
))
def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = {
val seg = createSegment(baseOffset)
val currentTime = Time.SYSTEM.milliseconds()
val shallowOffsetOfMaxTimestamp = largestOffset
val memoryRecords = records(0, "hello")
assertThrows(classOf[LogSegmentOffsetOverflowException], () => {
seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords)
seg.append(largestOffset, memoryRecords)
})
}
@ -112,7 +111,7 @@ class LogSegmentTest {
def testReadBeforeFirstOffset(): Unit = {
val seg = createSegment(40)
val ms = records(50, "hello", "there", "little", "bee")
seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms)
seg.append(53, ms)
val read = seg.read(41, 300).records
checkEquals(ms.records.iterator, read.records.iterator)
}
@ -124,7 +123,7 @@ class LogSegmentTest {
def testReadAfterLast(): Unit = {
val seg = createSegment(40)
val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
seg.append(51, ms)
val read = seg.read(52, 200)
assertNull(read, "Read beyond the last offset in the segment should give null")
}
@ -137,9 +136,9 @@ class LogSegmentTest {
def testReadFromGap(): Unit = {
val seg = createSegment(40)
val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
seg.append(51, ms)
val ms2 = records(60, "alpha", "beta")
seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
seg.append(61, ms2)
val read = seg.read(55, 200)
checkEquals(ms2.records.iterator, read.records.records.iterator)
}
@ -151,7 +150,7 @@ class LogSegmentTest {
val maxSize = 1
val seg = createSegment(40)
val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
seg.append(51, ms)
// read before first offset
var read = seg.read(48, maxSize, maxPosition, minOneMessage)
assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata)
@ -182,9 +181,9 @@ class LogSegmentTest {
var offset = 40
for (_ <- 0 until 30) {
val ms1 = records(offset, "hello")
seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
seg.append(offset, ms1)
val ms2 = records(offset + 1, "hello")
seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2)
seg.append(offset + 1, ms2)
// check that we can read back both messages
val read = seg.read(offset, 10000)
assertEquals(List(ms1.records.iterator.next(), ms2.records.iterator.next()), read.records.records.asScala.toList)
@ -243,7 +242,7 @@ class LogSegmentTest {
val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1)
var offset = 40
for (_ <- 0 until numMessages) {
seg.append(offset, offset, offset, records(offset, "hello"))
seg.append(offset, records(offset, "hello"))
offset += 1
}
assertEquals(offset, seg.readNextOffset)
@ -265,7 +264,17 @@ class LogSegmentTest {
// test the case where we fully truncate the log
val time = new MockTime
val seg = createSegment(40, time = time)
seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
seg.append(
41,
MemoryRecords.withRecords(
RecordBatch.MAGIC_VALUE_V1,
40,
Compression.NONE,
TimestampType.CREATE_TIME,
new SimpleRecord("hello".getBytes()),
new SimpleRecord("there".getBytes())
)
)
// If the segment is empty after truncation, the create time should be reset
time.sleep(500)
@ -277,7 +286,7 @@ class LogSegmentTest {
assertFalse(seg.offsetIndex.isFull)
assertNull(seg.read(0, 1024), "Segment should be empty.")
seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there"))
seg.append(41, records(40, "hello", "there"))
}
/**
@ -289,7 +298,7 @@ class LogSegmentTest {
val seg = createSegment(40, messageSize * 2 - 1)
// Produce some messages
for (i <- 40 until 50)
seg.append(i, i * 10, i, records(i, s"msg$i"))
seg.append(i, records(i, s"msg$i"))
assertEquals(490, seg.largestTimestamp)
// Search for an indexed timestamp
@ -313,7 +322,7 @@ class LogSegmentTest {
def testNextOffsetCalculation(): Unit = {
val seg = createSegment(40)
assertEquals(40, seg.readNextOffset)
seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you"))
seg.append(52, records(50, "hello", "there", "you"))
assertEquals(53, seg.readNextOffset)
}
@ -354,7 +363,7 @@ class LogSegmentTest {
def testRecoveryFixesCorruptIndex(): Unit = {
val seg = createSegment(0)
for (i <- 0 until 100)
seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
seg.append(i, records(i, i.toString))
val indexFile = seg.offsetIndexFile
writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
seg.recover(newProducerStateManager(), Optional.empty())
@ -375,25 +384,22 @@ class LogSegmentTest {
val pid2 = 10L
// append transactional records from pid1
segment.append(101L, RecordBatch.NO_TIMESTAMP,
100L, MemoryRecords.withTransactionalRecords(100L, Compression.NONE,
segment.append(101L, MemoryRecords.withTransactionalRecords(100L, Compression.NONE,
pid1, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// append transactional records from pid2
segment.append(103L, RecordBatch.NO_TIMESTAMP, 102L, MemoryRecords.withTransactionalRecords(102L, Compression.NONE,
segment.append(103L, MemoryRecords.withTransactionalRecords(102L, Compression.NONE,
pid2, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// append non-transactional records
segment.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE,
segment.append(105L, MemoryRecords.withRecords(104L, Compression.NONE,
partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// abort the transaction from pid2 (note LSO should be 100L since the txn from pid1 has not completed)
segment.append(106L, RecordBatch.NO_TIMESTAMP, 106L,
endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 106L))
segment.append(106L, endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 106L))
// commit the transaction from pid1
segment.append(107L, RecordBatch.NO_TIMESTAMP, 107L,
endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
segment.append(107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
var stateManager = newProducerStateManager()
segment.recover(stateManager, Optional.empty())
@ -434,16 +440,16 @@ class LogSegmentTest {
val checkpoint: LeaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1))
val cache = new LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new MockTime()))
seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE, 0,
seg.append(105L, MemoryRecords.withRecords(104L, Compression.NONE, 0,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.append(107L, RecordBatch.NO_TIMESTAMP, 106L, MemoryRecords.withRecords(106L, Compression.NONE, 1,
seg.append(107L, MemoryRecords.withRecords(106L, Compression.NONE, 1,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.append(109L, RecordBatch.NO_TIMESTAMP, 108L, MemoryRecords.withRecords(108L, Compression.NONE, 1,
seg.append(109L, MemoryRecords.withRecords(108L, Compression.NONE, 1,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.append(111L, RecordBatch.NO_TIMESTAMP, 110, MemoryRecords.withRecords(110L, Compression.NONE, 2,
seg.append(111L, MemoryRecords.withRecords(110L, Compression.NONE, 2,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
seg.recover(newProducerStateManager(), Optional.of(cache))
@ -472,7 +478,7 @@ class LogSegmentTest {
def testRecoveryFixesCorruptTimeIndex(): Unit = {
val seg = createSegment(0)
for (i <- 0 until 100)
seg.append(i, i * 10, i, records(i, i.toString))
seg.append(i, records(i, i.toString))
val timeIndexFile = seg.timeIndexFile
writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
seg.recover(newProducerStateManager(), Optional.empty())
@ -492,7 +498,7 @@ class LogSegmentTest {
for (_ <- 0 until 10) {
val seg = createSegment(0)
for (i <- 0 until messagesAppended)
seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
seg.append(i, records(i, i.toString))
val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
// start corrupting somewhere in the middle of the chosen record all the way to the end
@ -523,9 +529,9 @@ class LogSegmentTest {
def testCreateWithInitFileSizeAppendMessage(): Unit = {
val seg = createSegment(40, fileAlreadyExists = false, 512*1024*1024, preallocate = true)
val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
seg.append(51, ms)
val ms2 = records(60, "alpha", "beta")
seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
seg.append(61, ms2)
val read = seg.read(55, 200)
checkEquals(ms2.records.iterator, read.records.records.iterator)
}
@ -544,9 +550,9 @@ class LogSegmentTest {
512 * 1024 * 1024, true)
val ms = records(50, "hello", "there")
seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
seg.append(51, ms)
val ms2 = records(60, "alpha", "beta")
seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2)
seg.append(61, ms2)
val read = seg.read(55, 200)
checkEquals(ms2.records.iterator, read.records.records.iterator)
val oldSize = seg.log.sizeInBytes()
@ -581,9 +587,9 @@ class LogSegmentTest {
//Given two messages with a gap between them (e.g. mid offset compacted away)
val ms1 = records(offset, "first message")
seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1)
seg.append(offset, ms1)
val ms2 = records(offset + 3, "message after gap")
seg.append(offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2)
seg.append(offset + 3, ms2)
// When we truncate to an offset without a corresponding log entry
seg.truncateTo(offset + 1)
@ -629,12 +635,80 @@ class LogSegmentTest {
val segment = createSegment(1)
assertEquals(Long.MaxValue, segment.getFirstBatchTimestamp)
segment.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord("one".getBytes)))
segment.append(1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord(1000L, "one".getBytes)))
assertEquals(1000L, segment.getFirstBatchTimestamp)
segment.close()
}
@Test
def testIndexForMultipleBatchesInMemoryRecords(): Unit = {
val segment = createSegment(0, 1, Time.SYSTEM)
val buffer1 = ByteBuffer.allocate(1024)
// append first batch to buffer1
var builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 0)
builder.append(0L, "key1".getBytes, "value1".getBytes)
builder.close()
// append second batch to buffer1
builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 1)
builder.append(1L, "key1".getBytes, "value1".getBytes)
builder.close()
buffer1.flip
var record = MemoryRecords.readableRecords(buffer1)
segment.append(1L, record)
val buffer2 = ByteBuffer.allocate(1024)
// append first batch to buffer2
builder = MemoryRecords.builder(buffer2, Compression.NONE, TimestampType.CREATE_TIME, 2)
builder.append(2L, "key1".getBytes, "value1".getBytes)
builder.close()
buffer2.flip
record = MemoryRecords.readableRecords(buffer2)
segment.append(2L, record)
assertEquals(2, segment.offsetIndex.entries)
assertEquals(1, segment.offsetIndex.entry(0).offset)
assertEquals(2, segment.offsetIndex.entry(1).offset)
assertEquals(2, segment.timeIndex.entries)
assertEquals(new TimestampOffset(1, 1), segment.timeIndex.entry(0))
assertEquals(new TimestampOffset(2, 2), segment.timeIndex.entry(1))
}
@Test
def estNonMonotonicTimestampForMultipleBatchesInMemoryRecords(): Unit = {
val segment = createSegment(0, 1, Time.SYSTEM)
val buffer1 = ByteBuffer.allocate(1024)
// append first batch to buffer1
var builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 0)
builder.append(1L, "key1".getBytes, "value1".getBytes)
builder.close()
// append second batch to buffer1
builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 1)
builder.append(0L, "key1".getBytes, "value1".getBytes)
builder.close()
// append third batch to buffer1
builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 2)
builder.append(2L, "key1".getBytes, "value1".getBytes)
builder.close()
buffer1.flip()
val record = MemoryRecords.readableRecords(buffer1)
segment.append(2L, record)
assertEquals(2, segment.offsetIndex.entries)
assertEquals(1, segment.offsetIndex.entry(0).offset)
assertEquals(2, segment.offsetIndex.entry(1).offset)
assertEquals(2, segment.timeIndex.entries)
assertEquals(new TimestampOffset(1, 0), segment.timeIndex.entry(0))
assertEquals(new TimestampOffset(2, 2), segment.timeIndex.entry(1))
}
private def newProducerStateManager(): ProducerStateManager = {
new ProducerStateManager(
topicPartition,

View File

@ -184,12 +184,6 @@ class LogValidatorTest {
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
// If it's LOG_APPEND_TIME, the offset will be the offset of the first record
val expectedMaxTimestampOffset = magic match {
case RecordBatch.MAGIC_VALUE_V0 => -1
case RecordBatch.MAGIC_VALUE_V1 => 0
case _ => 2
}
assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp)
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records,
compressed = false)
}
@ -233,8 +227,6 @@ class LogValidatorTest {
"MessageSet should still valid")
assertEquals(now, validatedResults.maxTimestampMs,
s"Max timestamp should be $now")
assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp,
s"The shallow offset of max timestamp should be 2 if logAppendTime is used")
assertTrue(validatedResults.messageSizeMaybeChanged,
"Message size may have been changed")
@ -286,8 +278,6 @@ class LogValidatorTest {
"MessageSet should still valid")
assertEquals(now, validatedResults.maxTimestampMs,
s"Max timestamp should be $now")
assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp,
s"The shallow offset of max timestamp should be the last offset 2 if logAppendTime is used")
assertFalse(validatedResults.messageSizeMaybeChanged,
"Message size should not have been changed")
@ -423,10 +413,8 @@ class LogValidatorTest {
// V1: 3 batches are in the records, so the shallow OffsetOfMaxTimestamp is the timestamp of batch-1
if (magic >= RecordBatch.MAGIC_VALUE_V2) {
assertEquals(1, records.batches().asScala.size)
assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)
} else {
assertEquals(3, records.batches().asScala.size)
assertEquals(1, validatingResults.shallowOffsetOfMaxTimestamp)
}
assertFalse(validatingResults.messageSizeMaybeChanged,
@ -508,7 +496,6 @@ class LogValidatorTest {
// Both V2 and V1 has single batch in the validated records when compression is enable, and hence their shallow
// OffsetOfMaxTimestamp is the last offset of the single batch
assertEquals(1, validatedRecords.batches().asScala.size)
assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)
assertTrue(validatingResults.messageSizeMaybeChanged,
"Message size should have been changed")
@ -560,7 +547,6 @@ class LogValidatorTest {
}
assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP,
s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}")
assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp)
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records,
@ -607,8 +593,6 @@ class LogValidatorTest {
assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence)
}
assertEquals(timestamp, validatedResults.maxTimestampMs)
assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp,
s"Offset of max timestamp should be the last offset 2.")
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records,
@ -679,9 +663,6 @@ class LogValidatorTest {
}
assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}")
val expectedShallowOffsetOfMaxTimestamp = 2
assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestamp,
s"Shallow offset of max timestamp should be 2")
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records,

View File

@ -4224,8 +4224,8 @@ class UnifiedLogTest {
segments.add(seg2)
assertEquals(Seq(Long.MaxValue, Long.MaxValue), log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
seg1.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord("one".getBytes)))
seg2.append(2, 2000L, 1, MemoryRecords.withRecords(2, Compression.NONE, new SimpleRecord("two".getBytes)))
seg1.append(1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord(1000L, "one".getBytes)))
seg2.append(2, MemoryRecords.withRecords(2, Compression.NONE, new SimpleRecord(2000L, "two".getBytes)))
assertEquals(Seq(1000L, 2000L), log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)
seg1.close()

View File

@ -106,7 +106,6 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
lastOffset,
lastEpoch,
maxTimestamp,
shallowOffsetOfMaxTimestamp,
Time.SYSTEM.milliseconds(),
state.logStartOffset,
RecordValidationStats.EMPTY,

View File

@ -773,7 +773,6 @@ class ReplicaFetcherThreadTest {
0,
OptionalInt.empty,
RecordBatch.NO_TIMESTAMP,
-1L,
RecordBatch.NO_TIMESTAMP,
-1L,
RecordValidationStats.EMPTY,

View File

@ -31,13 +31,12 @@ import java.util.OptionalInt;
public class LogAppendInfo {
public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, -1L,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);
private long firstOffset;
private long lastOffset;
private long maxTimestamp;
private long shallowOffsetOfMaxTimestamp;
private long logAppendTime;
private long logStartOffset;
private RecordValidationStats recordValidationStats;
@ -52,31 +51,29 @@ public class LogAppendInfo {
/**
* Creates an instance with the given params.
*
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
* @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
* @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer)
* @param validBytes The number of valid bytes
* @param lastOffsetOfFirstBatch The last offset of the first batch
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
* @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer)
* @param validBytes The number of valid bytes
* @param lastOffsetOfFirstBatch The last offset of the first batch
*/
public LogAppendInfo(long firstOffset,
long lastOffset,
OptionalInt lastLeaderEpoch,
long maxTimestamp,
long shallowOffsetOfMaxTimestamp,
long logAppendTime,
long logStartOffset,
RecordValidationStats recordValidationStats,
CompressionType sourceCompression,
int validBytes,
long lastOffsetOfFirstBatch) {
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset,
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, logAppendTime, logStartOffset,
recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(),
LeaderHwChange.NONE);
}
@ -84,27 +81,25 @@ public class LogAppendInfo {
/**
* Creates an instance with the given params.
*
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
* @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
* @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer)
* @param validBytes The number of valid bytes
* @param lastOffsetOfFirstBatch The last offset of the first batch
* @param recordErrors List of record errors that caused the respective batch to be dropped
* @param leaderHwChange Incremental if the high watermark needs to be increased after appending record
* Same if high watermark is not changed. None is the default value and it means append failed
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
* @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer)
* @param validBytes The number of valid bytes
* @param lastOffsetOfFirstBatch The last offset of the first batch
* @param recordErrors List of record errors that caused the respective batch to be dropped
* @param leaderHwChange Incremental if the high watermark needs to be increased after appending record
* Same if high watermark is not changed. None is the default value and it means append failed
*/
public LogAppendInfo(long firstOffset,
long lastOffset,
OptionalInt lastLeaderEpoch,
long maxTimestamp,
long shallowOffsetOfMaxTimestamp,
long logAppendTime,
long logStartOffset,
RecordValidationStats recordValidationStats,
@ -117,7 +112,6 @@ public class LogAppendInfo {
this.lastOffset = lastOffset;
this.lastLeaderEpoch = lastLeaderEpoch;
this.maxTimestamp = maxTimestamp;
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
this.logAppendTime = logAppendTime;
this.logStartOffset = logStartOffset;
this.recordValidationStats = recordValidationStats;
@ -156,14 +150,6 @@ public class LogAppendInfo {
this.maxTimestamp = maxTimestamp;
}
public long shallowOffsetOfMaxTimestamp() {
return shallowOffsetOfMaxTimestamp;
}
public void setShallowOffsetOfMaxTimestamp(long shallowOffsetOfMaxTimestamp) {
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
}
public long logAppendTime() {
return logAppendTime;
}
@ -233,12 +219,12 @@ public class LogAppendInfo {
* @return a new instance with the given LeaderHwChange
*/
public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) {
return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats,
return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, logAppendTime, logStartOffset, recordValidationStats,
sourceCompression, validBytes, lastOffsetOfFirstBatch, recordErrors, newLeaderHwChange);
}
public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long logStartOffset) {
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);
}
@ -248,7 +234,7 @@ public class LogAppendInfo {
* in unknownLogAppendInfoWithLogStartOffset, but with additional fields recordErrors
*/
public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long logStartOffset, List<RecordError> recordErrors) {
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L, recordErrors, LeaderHwChange.NONE);
}
@ -259,7 +245,6 @@ public class LogAppendInfo {
", lastOffset=" + lastOffset +
", lastLeaderEpoch=" + lastLeaderEpoch +
", maxTimestamp=" + maxTimestamp +
", shallowOffsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp +
", logAppendTime=" + logAppendTime +
", logStartOffset=" + logStartOffset +
", recordConversionStats=" + recordValidationStats +

View File

@ -237,38 +237,38 @@ public class LogSegment implements Closeable {
* It is assumed this method is being called from within a lock, it is not thread-safe otherwise.
*
* @param largestOffset The last offset in the message set
* @param largestTimestampMs The largest timestamp in the message set.
* @param shallowOffsetOfMaxTimestamp The last offset of earliest batch with max timestamp in the messages to append.
* @param records The log entries to append.
* @param records The log entries to append.
* @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
*/
public void append(long largestOffset,
long largestTimestampMs,
long shallowOffsetOfMaxTimestamp,
MemoryRecords records) throws IOException {
if (records.sizeInBytes() > 0) {
LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}",
records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp);
LOGGER.trace("Inserting {} bytes at end offset {} at position {}",
records.sizeInBytes(), largestOffset, log.sizeInBytes());
int physicalPosition = log.sizeInBytes();
if (physicalPosition == 0)
rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);
ensureOffsetInRange(largestOffset);
// append the messages
long appendedBytes = log.append(records);
LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset);
// Update the in memory max timestamp and corresponding offset.
if (largestTimestampMs > maxTimestampSoFar()) {
maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
for (RecordBatch batch : records.batches()) {
long batchMaxTimestamp = batch.maxTimestamp();
long batchLastOffset = batch.lastOffset();
if (batchMaxTimestamp > maxTimestampSoFar()) {
maxTimestampAndOffsetSoFar = new TimestampOffset(batchMaxTimestamp, batchLastOffset);
}
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex().append(batchLastOffset, physicalPosition);
timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());
bytesSinceLastIndexEntry = 0;
}
int sizeInBytes = batch.sizeInBytes();
physicalPosition += sizeInBytes;
bytesSinceLastIndexEntry += sizeInBytes;
}
// append an entry to the index (if needed)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex().append(largestOffset, physicalPosition);
timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());
bytesSinceLastIndexEntry = 0;
}
bytesSinceLastIndexEntry += records.sizeInBytes();
}
}
@ -279,8 +279,6 @@ public class LogSegment implements Closeable {
private int appendChunkFromFile(FileRecords records, int position, BufferSupplier bufferSupplier) throws IOException {
int bytesToAppend = 0;
long maxTimestamp = Long.MIN_VALUE;
long shallowOffsetOfMaxTimestamp = Long.MIN_VALUE;
long maxOffset = Long.MIN_VALUE;
ByteBuffer readBuffer = bufferSupplier.get(1024 * 1024);
@ -289,10 +287,6 @@ public class LogSegment implements Closeable {
Iterator<FileChannelRecordBatch> nextBatches = records.batchesFrom(position).iterator();
FileChannelRecordBatch batch;
while ((batch = nextAppendableBatch(nextBatches, readBuffer, bytesToAppend)) != null) {
if (batch.maxTimestamp() > maxTimestamp) {
maxTimestamp = batch.maxTimestamp();
shallowOffsetOfMaxTimestamp = batch.lastOffset();
}
maxOffset = batch.lastOffset();
bytesToAppend += batch.sizeInBytes();
}
@ -305,7 +299,7 @@ public class LogSegment implements Closeable {
readBuffer.limit(bytesToAppend);
records.readInto(readBuffer, position);
append(maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp, MemoryRecords.readableRecords(readBuffer));
append(maxOffset, MemoryRecords.readableRecords(readBuffer));
}
bufferSupplier.release(readBuffer);

View File

@ -69,20 +69,15 @@ public class LogValidator {
public final long logAppendTimeMs;
public final MemoryRecords validatedRecords;
public final long maxTimestampMs;
// we only maintain batch level offset for max timestamp since we want to align the behavior of updating time
// indexing entries. The paths of follower append and replica recovery do not iterate all records, so they have no
// idea about record level offset for max timestamp.
public final long shallowOffsetOfMaxTimestamp;
public final boolean messageSizeMaybeChanged;
public final RecordValidationStats recordValidationStats;
public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs,
long shallowOffsetOfMaxTimestamp, boolean messageSizeMaybeChanged,
boolean messageSizeMaybeChanged,
RecordValidationStats recordValidationStats) {
this.logAppendTimeMs = logAppendTimeMs;
this.validatedRecords = validatedRecords;
this.maxTimestampMs = maxTimestampMs;
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
this.messageSizeMaybeChanged = messageSizeMaybeChanged;
this.recordValidationStats = recordValidationStats;
}
@ -236,7 +231,6 @@ public class LogValidator {
now,
convertedRecords,
info.maxTimestamp,
info.shallowOffsetOfMaxTimestamp,
true,
recordValidationStats);
}
@ -246,8 +240,6 @@ public class LogValidator {
MetricsRecorder metricsRecorder) {
long now = time.milliseconds();
long maxTimestamp = RecordBatch.NO_TIMESTAMP;
long shallowOffsetOfMaxTimestamp = -1L;
long initialOffset = offsetCounter.value;
RecordBatch firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, CompressionType.NONE);
@ -276,7 +268,6 @@ public class LogValidator {
if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) {
maxTimestamp = maxBatchTimestamp;
shallowOffsetOfMaxTimestamp = offsetCounter.value - 1;
}
batch.setLastOffset(offsetCounter.value - 1);
@ -293,23 +284,10 @@ public class LogValidator {
}
if (timestampType == TimestampType.LOG_APPEND_TIME) {
maxTimestamp = now;
// those checks should be equal to MemoryRecordsBuilder#info
switch (toMagic) {
case RecordBatch.MAGIC_VALUE_V0:
maxTimestamp = RecordBatch.NO_TIMESTAMP;
// value will be the default value: -1
shallowOffsetOfMaxTimestamp = -1;
break;
case RecordBatch.MAGIC_VALUE_V1:
// Those single-record batches have same max timestamp, so the initial offset is equal with
// the last offset of earliest batch
shallowOffsetOfMaxTimestamp = initialOffset;
break;
default:
// there is only one batch so use the last offset
shallowOffsetOfMaxTimestamp = offsetCounter.value - 1;
break;
if (toMagic == RecordBatch.MAGIC_VALUE_V0) {
maxTimestamp = RecordBatch.NO_TIMESTAMP;
} else {
maxTimestamp = now;
}
}
@ -317,7 +295,6 @@ public class LogValidator {
now,
records,
maxTimestamp,
shallowOffsetOfMaxTimestamp,
false,
RecordValidationStats.EMPTY);
}
@ -445,7 +422,6 @@ public class LogValidator {
now,
records,
maxTimestamp,
lastOffset,
false,
recordValidationStats);
}
@ -487,7 +463,6 @@ public class LogValidator {
logAppendTime,
records,
info.maxTimestamp,
info.shallowOffsetOfMaxTimestamp,
true,
recordValidationStats);
}