diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index 09fa464af04..ae7e30dab29 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -413,8 +413,8 @@ class LocalLog(@volatile private var _dir: File, } } - private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { - segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records) + private[log] def append(lastOffset: Long, records: MemoryRecords): Unit = { + segments.activeSegment.append(lastOffset, records) updateLogEndOffset(lastOffset + 1) } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 51d19878754..34e5bc1b7e2 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -809,7 +809,7 @@ private[log] class Cleaner(val id: Int, val retained = MemoryRecords.readableRecords(outputBuffer) // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads // after `Log.replaceSegments` (which acquires the lock) is called - dest.append(result.maxOffset, result.maxTimestamp, result.shallowOffsetOfMaxTimestamp(), retained) + dest.append(result.maxOffset, retained) throttler.maybeThrottle(outputBuffer.limit()) } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 3ce0fdabc9e..6bcfd8d078d 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -818,7 +818,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs) - appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp) appendInfo.setLastOffset(offset.value - 1) appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats) if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) @@ -904,7 +903,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // will be cleaned up after the log directory is recovered. Note that the end offset of the // ProducerStateManager will not be updated and the last stable offset will not advance // if the append to the transaction index fails. - localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.shallowOffsetOfMaxTimestamp, validRecords) + localLog.append(appendInfo.lastOffset, validRecords) updateHighWatermarkWithLogEndOffset() // update the producer state @@ -1190,7 +1189,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, else OptionalInt.empty() - new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, shallowOffsetOfMaxTimestamp, + new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression, validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE) } diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala index cf1f1a06177..425ec7955eb 100644 --- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala @@ -98,8 +98,6 @@ class LocalLogTest { log: LocalLog = log, initialOffset: Long = 0L): Unit = { log.append(lastOffset = initialOffset + records.size - 1, - largestTimestamp = records.head.timestamp, - shallowOffsetOfMaxTimestamp = initialOffset, records = MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, records.toList : _*)) } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index e2272941ab3..ccb7586296b 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -34,6 +34,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import java.io.{File, RandomAccessFile} +import java.nio.ByteBuffer import java.util.{Optional, OptionalLong} import scala.collection._ import scala.jdk.CollectionConverters._ @@ -86,11 +87,9 @@ class LogSegmentTest { )) def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = { val seg = createSegment(baseOffset) - val currentTime = Time.SYSTEM.milliseconds() - val shallowOffsetOfMaxTimestamp = largestOffset val memoryRecords = records(0, "hello") assertThrows(classOf[LogSegmentOffsetOverflowException], () => { - seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords) + seg.append(largestOffset, memoryRecords) }) } @@ -112,7 +111,7 @@ class LogSegmentTest { def testReadBeforeFirstOffset(): Unit = { val seg = createSegment(40) val ms = records(50, "hello", "there", "little", "bee") - seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms) + seg.append(53, ms) val read = seg.read(41, 300).records checkEquals(ms.records.iterator, read.records.iterator) } @@ -124,7 +123,7 @@ class LogSegmentTest { def testReadAfterLast(): Unit = { val seg = createSegment(40) val ms = records(50, "hello", "there") - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) + seg.append(51, ms) val read = seg.read(52, 200) assertNull(read, "Read beyond the last offset in the segment should give null") } @@ -137,9 +136,9 @@ class LogSegmentTest { def testReadFromGap(): Unit = { val seg = createSegment(40) val ms = records(50, "hello", "there") - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) + seg.append(51, ms) val ms2 = records(60, "alpha", "beta") - seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2) + seg.append(61, ms2) val read = seg.read(55, 200) checkEquals(ms2.records.iterator, read.records.records.iterator) } @@ -151,7 +150,7 @@ class LogSegmentTest { val maxSize = 1 val seg = createSegment(40) val ms = records(50, "hello", "there") - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) + seg.append(51, ms) // read before first offset var read = seg.read(48, maxSize, maxPosition, minOneMessage) assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata) @@ -182,9 +181,9 @@ class LogSegmentTest { var offset = 40 for (_ <- 0 until 30) { val ms1 = records(offset, "hello") - seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1) + seg.append(offset, ms1) val ms2 = records(offset + 1, "hello") - seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2) + seg.append(offset + 1, ms2) // check that we can read back both messages val read = seg.read(offset, 10000) assertEquals(List(ms1.records.iterator.next(), ms2.records.iterator.next()), read.records.records.asScala.toList) @@ -243,7 +242,7 @@ class LogSegmentTest { val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1) var offset = 40 for (_ <- 0 until numMessages) { - seg.append(offset, offset, offset, records(offset, "hello")) + seg.append(offset, records(offset, "hello")) offset += 1 } assertEquals(offset, seg.readNextOffset) @@ -265,7 +264,17 @@ class LogSegmentTest { // test the case where we fully truncate the log val time = new MockTime val seg = createSegment(40, time = time) - seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")) + seg.append( + 41, + MemoryRecords.withRecords( + RecordBatch.MAGIC_VALUE_V1, + 40, + Compression.NONE, + TimestampType.CREATE_TIME, + new SimpleRecord("hello".getBytes()), + new SimpleRecord("there".getBytes()) + ) + ) // If the segment is empty after truncation, the create time should be reset time.sleep(500) @@ -277,7 +286,7 @@ class LogSegmentTest { assertFalse(seg.offsetIndex.isFull) assertNull(seg.read(0, 1024), "Segment should be empty.") - seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")) + seg.append(41, records(40, "hello", "there")) } /** @@ -289,7 +298,7 @@ class LogSegmentTest { val seg = createSegment(40, messageSize * 2 - 1) // Produce some messages for (i <- 40 until 50) - seg.append(i, i * 10, i, records(i, s"msg$i")) + seg.append(i, records(i, s"msg$i")) assertEquals(490, seg.largestTimestamp) // Search for an indexed timestamp @@ -313,7 +322,7 @@ class LogSegmentTest { def testNextOffsetCalculation(): Unit = { val seg = createSegment(40) assertEquals(40, seg.readNextOffset) - seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you")) + seg.append(52, records(50, "hello", "there", "you")) assertEquals(53, seg.readNextOffset) } @@ -354,7 +363,7 @@ class LogSegmentTest { def testRecoveryFixesCorruptIndex(): Unit = { val seg = createSegment(0) for (i <- 0 until 100) - seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString)) + seg.append(i, records(i, i.toString)) val indexFile = seg.offsetIndexFile writeNonsenseToFile(indexFile, 5, indexFile.length.toInt) seg.recover(newProducerStateManager(), Optional.empty()) @@ -375,25 +384,22 @@ class LogSegmentTest { val pid2 = 10L // append transactional records from pid1 - segment.append(101L, RecordBatch.NO_TIMESTAMP, - 100L, MemoryRecords.withTransactionalRecords(100L, Compression.NONE, + segment.append(101L, MemoryRecords.withTransactionalRecords(100L, Compression.NONE, pid1, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) // append transactional records from pid2 - segment.append(103L, RecordBatch.NO_TIMESTAMP, 102L, MemoryRecords.withTransactionalRecords(102L, Compression.NONE, + segment.append(103L, MemoryRecords.withTransactionalRecords(102L, Compression.NONE, pid2, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) // append non-transactional records - segment.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE, + segment.append(105L, MemoryRecords.withRecords(104L, Compression.NONE, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) // abort the transaction from pid2 (note LSO should be 100L since the txn from pid1 has not completed) - segment.append(106L, RecordBatch.NO_TIMESTAMP, 106L, - endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 106L)) + segment.append(106L, endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 106L)) // commit the transaction from pid1 - segment.append(107L, RecordBatch.NO_TIMESTAMP, 107L, - endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L)) + segment.append(107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L)) var stateManager = newProducerStateManager() segment.recover(stateManager, Optional.empty()) @@ -434,16 +440,16 @@ class LogSegmentTest { val checkpoint: LeaderEpochCheckpointFile = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)) val cache = new LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new MockTime())) - seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE, 0, + seg.append(105L, MemoryRecords.withRecords(104L, Compression.NONE, 0, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) - seg.append(107L, RecordBatch.NO_TIMESTAMP, 106L, MemoryRecords.withRecords(106L, Compression.NONE, 1, + seg.append(107L, MemoryRecords.withRecords(106L, Compression.NONE, 1, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) - seg.append(109L, RecordBatch.NO_TIMESTAMP, 108L, MemoryRecords.withRecords(108L, Compression.NONE, 1, + seg.append(109L, MemoryRecords.withRecords(108L, Compression.NONE, 1, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) - seg.append(111L, RecordBatch.NO_TIMESTAMP, 110, MemoryRecords.withRecords(110L, Compression.NONE, 2, + seg.append(111L, MemoryRecords.withRecords(110L, Compression.NONE, 2, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) seg.recover(newProducerStateManager(), Optional.of(cache)) @@ -472,7 +478,7 @@ class LogSegmentTest { def testRecoveryFixesCorruptTimeIndex(): Unit = { val seg = createSegment(0) for (i <- 0 until 100) - seg.append(i, i * 10, i, records(i, i.toString)) + seg.append(i, records(i, i.toString)) val timeIndexFile = seg.timeIndexFile writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt) seg.recover(newProducerStateManager(), Optional.empty()) @@ -492,7 +498,7 @@ class LogSegmentTest { for (_ <- 0 until 10) { val seg = createSegment(0) for (i <- 0 until messagesAppended) - seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString)) + seg.append(i, records(i, i.toString)) val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended) // start corrupting somewhere in the middle of the chosen record all the way to the end @@ -523,9 +529,9 @@ class LogSegmentTest { def testCreateWithInitFileSizeAppendMessage(): Unit = { val seg = createSegment(40, fileAlreadyExists = false, 512*1024*1024, preallocate = true) val ms = records(50, "hello", "there") - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) + seg.append(51, ms) val ms2 = records(60, "alpha", "beta") - seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2) + seg.append(61, ms2) val read = seg.read(55, 200) checkEquals(ms2.records.iterator, read.records.records.iterator) } @@ -544,9 +550,9 @@ class LogSegmentTest { 512 * 1024 * 1024, true) val ms = records(50, "hello", "there") - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) + seg.append(51, ms) val ms2 = records(60, "alpha", "beta") - seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2) + seg.append(61, ms2) val read = seg.read(55, 200) checkEquals(ms2.records.iterator, read.records.records.iterator) val oldSize = seg.log.sizeInBytes() @@ -581,9 +587,9 @@ class LogSegmentTest { //Given two messages with a gap between them (e.g. mid offset compacted away) val ms1 = records(offset, "first message") - seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1) + seg.append(offset, ms1) val ms2 = records(offset + 3, "message after gap") - seg.append(offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2) + seg.append(offset + 3, ms2) // When we truncate to an offset without a corresponding log entry seg.truncateTo(offset + 1) @@ -629,12 +635,80 @@ class LogSegmentTest { val segment = createSegment(1) assertEquals(Long.MaxValue, segment.getFirstBatchTimestamp) - segment.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord("one".getBytes))) + segment.append(1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord(1000L, "one".getBytes))) assertEquals(1000L, segment.getFirstBatchTimestamp) segment.close() } + @Test + def testIndexForMultipleBatchesInMemoryRecords(): Unit = { + val segment = createSegment(0, 1, Time.SYSTEM) + + val buffer1 = ByteBuffer.allocate(1024) + + // append first batch to buffer1 + var builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 0) + builder.append(0L, "key1".getBytes, "value1".getBytes) + builder.close() + + // append second batch to buffer1 + builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 1) + builder.append(1L, "key1".getBytes, "value1".getBytes) + builder.close() + + buffer1.flip + var record = MemoryRecords.readableRecords(buffer1) + segment.append(1L, record) + + val buffer2 = ByteBuffer.allocate(1024) + // append first batch to buffer2 + builder = MemoryRecords.builder(buffer2, Compression.NONE, TimestampType.CREATE_TIME, 2) + builder.append(2L, "key1".getBytes, "value1".getBytes) + builder.close() + buffer2.flip + record = MemoryRecords.readableRecords(buffer2) + segment.append(2L, record) + assertEquals(2, segment.offsetIndex.entries) + assertEquals(1, segment.offsetIndex.entry(0).offset) + assertEquals(2, segment.offsetIndex.entry(1).offset) + assertEquals(2, segment.timeIndex.entries) + assertEquals(new TimestampOffset(1, 1), segment.timeIndex.entry(0)) + assertEquals(new TimestampOffset(2, 2), segment.timeIndex.entry(1)) + } + + @Test + def estNonMonotonicTimestampForMultipleBatchesInMemoryRecords(): Unit = { + val segment = createSegment(0, 1, Time.SYSTEM) + val buffer1 = ByteBuffer.allocate(1024) + + // append first batch to buffer1 + var builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 0) + builder.append(1L, "key1".getBytes, "value1".getBytes) + builder.close() + + // append second batch to buffer1 + builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 1) + builder.append(0L, "key1".getBytes, "value1".getBytes) + builder.close() + + // append third batch to buffer1 + builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 2) + builder.append(2L, "key1".getBytes, "value1".getBytes) + builder.close() + + buffer1.flip() + val record = MemoryRecords.readableRecords(buffer1) + segment.append(2L, record) + + assertEquals(2, segment.offsetIndex.entries) + assertEquals(1, segment.offsetIndex.entry(0).offset) + assertEquals(2, segment.offsetIndex.entry(1).offset) + assertEquals(2, segment.timeIndex.entries) + assertEquals(new TimestampOffset(1, 0), segment.timeIndex.entry(0)) + assertEquals(new TimestampOffset(2, 2), segment.timeIndex.entry(1)) + } + private def newProducerStateManager(): ProducerStateManager = { new ProducerStateManager( topicPartition, diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 82b5999a6aa..362e6430af5 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -184,12 +184,6 @@ class LogValidatorTest { assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") // If it's LOG_APPEND_TIME, the offset will be the offset of the first record - val expectedMaxTimestampOffset = magic match { - case RecordBatch.MAGIC_VALUE_V0 => -1 - case RecordBatch.MAGIC_VALUE_V1 => 0 - case _ => 2 - } - assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestamp) verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = false) } @@ -233,8 +227,6 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, - s"The shallow offset of max timestamp should be 2 if logAppendTime is used") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size may have been changed") @@ -286,8 +278,6 @@ class LogValidatorTest { "MessageSet should still valid") assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now") - assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, - s"The shallow offset of max timestamp should be the last offset 2 if logAppendTime is used") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") @@ -423,10 +413,8 @@ class LogValidatorTest { // V1: 3 batches are in the records, so the shallow OffsetOfMaxTimestamp is the timestamp of batch-1 if (magic >= RecordBatch.MAGIC_VALUE_V2) { assertEquals(1, records.batches().asScala.size) - assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) } else { assertEquals(3, records.batches().asScala.size) - assertEquals(1, validatingResults.shallowOffsetOfMaxTimestamp) } assertFalse(validatingResults.messageSizeMaybeChanged, @@ -508,7 +496,6 @@ class LogValidatorTest { // Both V2 and V1 has single batch in the validated records when compression is enable, and hence their shallow // OffsetOfMaxTimestamp is the last offset of the single batch assertEquals(1, validatedRecords.batches().asScala.size) - assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp) assertTrue(validatingResults.messageSizeMaybeChanged, "Message size should have been changed") @@ -560,7 +547,6 @@ class LogValidatorTest { } assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP, s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}") - assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp) assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, @@ -607,8 +593,6 @@ class LogValidatorTest { assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence) } assertEquals(timestamp, validatedResults.maxTimestampMs) - assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, - s"Offset of max timestamp should be the last offset 2.") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, @@ -679,9 +663,6 @@ class LogValidatorTest { } assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - val expectedShallowOffsetOfMaxTimestamp = 2 - assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestamp, - s"Shallow offset of max timestamp should be 2") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index bbaf547c8bc..672da631c7e 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -4224,8 +4224,8 @@ class UnifiedLogTest { segments.add(seg2) assertEquals(Seq(Long.MaxValue, Long.MaxValue), log.getFirstBatchTimestampForSegments(segments).asScala.toSeq) - seg1.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord("one".getBytes))) - seg2.append(2, 2000L, 1, MemoryRecords.withRecords(2, Compression.NONE, new SimpleRecord("two".getBytes))) + seg1.append(1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord(1000L, "one".getBytes))) + seg2.append(2, MemoryRecords.withRecords(2, Compression.NONE, new SimpleRecord(2000L, "two".getBytes))) assertEquals(Seq(1000L, 2000L), log.getFirstBatchTimestampForSegments(segments).asScala.toSeq) seg1.close() diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala index 30528a6729d..a68428775a9 100644 --- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala +++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala @@ -106,7 +106,6 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint, lastOffset, lastEpoch, maxTimestamp, - shallowOffsetOfMaxTimestamp, Time.SYSTEM.milliseconds(), state.logStartOffset, RecordValidationStats.EMPTY, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 7d2e1997087..883e5a5acbd 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -773,7 +773,6 @@ class ReplicaFetcherThreadTest { 0, OptionalInt.empty, RecordBatch.NO_TIMESTAMP, - -1L, RecordBatch.NO_TIMESTAMP, -1L, RecordValidationStats.EMPTY, diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java index 05e162a3042..63a8a510818 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java @@ -31,13 +31,12 @@ import java.util.OptionalInt; public class LogAppendInfo { public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new LogAppendInfo(-1, -1, OptionalInt.empty(), - RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, + RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, -1L, RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L); private long firstOffset; private long lastOffset; private long maxTimestamp; - private long shallowOffsetOfMaxTimestamp; private long logAppendTime; private long logStartOffset; private RecordValidationStats recordValidationStats; @@ -52,31 +51,29 @@ public class LogAppendInfo { /** * Creates an instance with the given params. * - * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending - * to the follower. - * @param lastOffset The last offset in the message set - * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. - * @param maxTimestamp The maximum timestamp of the message set. - * @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp. - * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp - * @param logStartOffset The start offset of the log at the time of this append. - * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` - * @param sourceCompression The source codec used in the message set (send by the producer) - * @param validBytes The number of valid bytes - * @param lastOffsetOfFirstBatch The last offset of the first batch + * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending + * to the follower. + * @param lastOffset The last offset in the message set + * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. + * @param maxTimestamp The maximum timestamp of the message set. + * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp + * @param logStartOffset The start offset of the log at the time of this append. + * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` + * @param sourceCompression The source codec used in the message set (send by the producer) + * @param validBytes The number of valid bytes + * @param lastOffsetOfFirstBatch The last offset of the first batch */ public LogAppendInfo(long firstOffset, long lastOffset, OptionalInt lastLeaderEpoch, long maxTimestamp, - long shallowOffsetOfMaxTimestamp, long logAppendTime, long logStartOffset, RecordValidationStats recordValidationStats, CompressionType sourceCompression, int validBytes, long lastOffsetOfFirstBatch) { - this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset, + this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, logAppendTime, logStartOffset, recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(), LeaderHwChange.NONE); } @@ -84,27 +81,25 @@ public class LogAppendInfo { /** * Creates an instance with the given params. * - * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending - * to the follower. - * @param lastOffset The last offset in the message set - * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. - * @param maxTimestamp The maximum timestamp of the message set. - * @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp. - * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp - * @param logStartOffset The start offset of the log at the time of this append. - * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` - * @param sourceCompression The source codec used in the message set (send by the producer) - * @param validBytes The number of valid bytes - * @param lastOffsetOfFirstBatch The last offset of the first batch - * @param recordErrors List of record errors that caused the respective batch to be dropped - * @param leaderHwChange Incremental if the high watermark needs to be increased after appending record - * Same if high watermark is not changed. None is the default value and it means append failed + * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending + * to the follower. + * @param lastOffset The last offset in the message set + * @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available. + * @param maxTimestamp The maximum timestamp of the message set. + * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp + * @param logStartOffset The start offset of the log at the time of this append. + * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` + * @param sourceCompression The source codec used in the message set (send by the producer) + * @param validBytes The number of valid bytes + * @param lastOffsetOfFirstBatch The last offset of the first batch + * @param recordErrors List of record errors that caused the respective batch to be dropped + * @param leaderHwChange Incremental if the high watermark needs to be increased after appending record + * Same if high watermark is not changed. None is the default value and it means append failed */ public LogAppendInfo(long firstOffset, long lastOffset, OptionalInt lastLeaderEpoch, long maxTimestamp, - long shallowOffsetOfMaxTimestamp, long logAppendTime, long logStartOffset, RecordValidationStats recordValidationStats, @@ -117,7 +112,6 @@ public class LogAppendInfo { this.lastOffset = lastOffset; this.lastLeaderEpoch = lastLeaderEpoch; this.maxTimestamp = maxTimestamp; - this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; this.recordValidationStats = recordValidationStats; @@ -156,14 +150,6 @@ public class LogAppendInfo { this.maxTimestamp = maxTimestamp; } - public long shallowOffsetOfMaxTimestamp() { - return shallowOffsetOfMaxTimestamp; - } - - public void setShallowOffsetOfMaxTimestamp(long shallowOffsetOfMaxTimestamp) { - this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; - } - public long logAppendTime() { return logAppendTime; } @@ -233,12 +219,12 @@ public class LogAppendInfo { * @return a new instance with the given LeaderHwChange */ public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) { - return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats, + return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, logAppendTime, logStartOffset, recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, recordErrors, newLeaderHwChange); } public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long logStartOffset) { - return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, + return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L); } @@ -248,7 +234,7 @@ public class LogAppendInfo { * in unknownLogAppendInfoWithLogStartOffset, but with additional fields recordErrors */ public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long logStartOffset, List recordErrors) { - return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, + return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L, recordErrors, LeaderHwChange.NONE); } @@ -259,7 +245,6 @@ public class LogAppendInfo { ", lastOffset=" + lastOffset + ", lastLeaderEpoch=" + lastLeaderEpoch + ", maxTimestamp=" + maxTimestamp + - ", shallowOffsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp + ", logAppendTime=" + logAppendTime + ", logStartOffset=" + logStartOffset + ", recordConversionStats=" + recordValidationStats + diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java index 969fa8b1db4..c02cbd2deb8 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java @@ -237,38 +237,38 @@ public class LogSegment implements Closeable { * It is assumed this method is being called from within a lock, it is not thread-safe otherwise. * * @param largestOffset The last offset in the message set - * @param largestTimestampMs The largest timestamp in the message set. - * @param shallowOffsetOfMaxTimestamp The last offset of earliest batch with max timestamp in the messages to append. - * @param records The log entries to append. + * @param records The log entries to append. * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow */ public void append(long largestOffset, - long largestTimestampMs, - long shallowOffsetOfMaxTimestamp, MemoryRecords records) throws IOException { if (records.sizeInBytes() > 0) { - LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}", - records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp); + LOGGER.trace("Inserting {} bytes at end offset {} at position {}", + records.sizeInBytes(), largestOffset, log.sizeInBytes()); int physicalPosition = log.sizeInBytes(); - if (physicalPosition == 0) - rollingBasedTimestamp = OptionalLong.of(largestTimestampMs); ensureOffsetInRange(largestOffset); // append the messages long appendedBytes = log.append(records); LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset); - // Update the in memory max timestamp and corresponding offset. - if (largestTimestampMs > maxTimestampSoFar()) { - maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp); + + for (RecordBatch batch : records.batches()) { + long batchMaxTimestamp = batch.maxTimestamp(); + long batchLastOffset = batch.lastOffset(); + if (batchMaxTimestamp > maxTimestampSoFar()) { + maxTimestampAndOffsetSoFar = new TimestampOffset(batchMaxTimestamp, batchLastOffset); + } + + if (bytesSinceLastIndexEntry > indexIntervalBytes) { + offsetIndex().append(batchLastOffset, physicalPosition); + timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); + bytesSinceLastIndexEntry = 0; + } + int sizeInBytes = batch.sizeInBytes(); + physicalPosition += sizeInBytes; + bytesSinceLastIndexEntry += sizeInBytes; } - // append an entry to the index (if needed) - if (bytesSinceLastIndexEntry > indexIntervalBytes) { - offsetIndex().append(largestOffset, physicalPosition); - timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); - bytesSinceLastIndexEntry = 0; - } - bytesSinceLastIndexEntry += records.sizeInBytes(); } } @@ -279,8 +279,6 @@ public class LogSegment implements Closeable { private int appendChunkFromFile(FileRecords records, int position, BufferSupplier bufferSupplier) throws IOException { int bytesToAppend = 0; - long maxTimestamp = Long.MIN_VALUE; - long shallowOffsetOfMaxTimestamp = Long.MIN_VALUE; long maxOffset = Long.MIN_VALUE; ByteBuffer readBuffer = bufferSupplier.get(1024 * 1024); @@ -289,10 +287,6 @@ public class LogSegment implements Closeable { Iterator nextBatches = records.batchesFrom(position).iterator(); FileChannelRecordBatch batch; while ((batch = nextAppendableBatch(nextBatches, readBuffer, bytesToAppend)) != null) { - if (batch.maxTimestamp() > maxTimestamp) { - maxTimestamp = batch.maxTimestamp(); - shallowOffsetOfMaxTimestamp = batch.lastOffset(); - } maxOffset = batch.lastOffset(); bytesToAppend += batch.sizeInBytes(); } @@ -305,7 +299,7 @@ public class LogSegment implements Closeable { readBuffer.limit(bytesToAppend); records.readInto(readBuffer, position); - append(maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp, MemoryRecords.readableRecords(readBuffer)); + append(maxOffset, MemoryRecords.readableRecords(readBuffer)); } bufferSupplier.release(readBuffer); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index 53f7cc36318..1521f74ac87 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -69,20 +69,15 @@ public class LogValidator { public final long logAppendTimeMs; public final MemoryRecords validatedRecords; public final long maxTimestampMs; - // we only maintain batch level offset for max timestamp since we want to align the behavior of updating time - // indexing entries. The paths of follower append and replica recovery do not iterate all records, so they have no - // idea about record level offset for max timestamp. - public final long shallowOffsetOfMaxTimestamp; public final boolean messageSizeMaybeChanged; public final RecordValidationStats recordValidationStats; public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs, - long shallowOffsetOfMaxTimestamp, boolean messageSizeMaybeChanged, + boolean messageSizeMaybeChanged, RecordValidationStats recordValidationStats) { this.logAppendTimeMs = logAppendTimeMs; this.validatedRecords = validatedRecords; this.maxTimestampMs = maxTimestampMs; - this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; this.messageSizeMaybeChanged = messageSizeMaybeChanged; this.recordValidationStats = recordValidationStats; } @@ -236,7 +231,6 @@ public class LogValidator { now, convertedRecords, info.maxTimestamp, - info.shallowOffsetOfMaxTimestamp, true, recordValidationStats); } @@ -246,8 +240,6 @@ public class LogValidator { MetricsRecorder metricsRecorder) { long now = time.milliseconds(); long maxTimestamp = RecordBatch.NO_TIMESTAMP; - long shallowOffsetOfMaxTimestamp = -1L; - long initialOffset = offsetCounter.value; RecordBatch firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, CompressionType.NONE); @@ -276,7 +268,6 @@ public class LogValidator { if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) { maxTimestamp = maxBatchTimestamp; - shallowOffsetOfMaxTimestamp = offsetCounter.value - 1; } batch.setLastOffset(offsetCounter.value - 1); @@ -293,23 +284,10 @@ public class LogValidator { } if (timestampType == TimestampType.LOG_APPEND_TIME) { - maxTimestamp = now; - // those checks should be equal to MemoryRecordsBuilder#info - switch (toMagic) { - case RecordBatch.MAGIC_VALUE_V0: - maxTimestamp = RecordBatch.NO_TIMESTAMP; - // value will be the default value: -1 - shallowOffsetOfMaxTimestamp = -1; - break; - case RecordBatch.MAGIC_VALUE_V1: - // Those single-record batches have same max timestamp, so the initial offset is equal with - // the last offset of earliest batch - shallowOffsetOfMaxTimestamp = initialOffset; - break; - default: - // there is only one batch so use the last offset - shallowOffsetOfMaxTimestamp = offsetCounter.value - 1; - break; + if (toMagic == RecordBatch.MAGIC_VALUE_V0) { + maxTimestamp = RecordBatch.NO_TIMESTAMP; + } else { + maxTimestamp = now; } } @@ -317,7 +295,6 @@ public class LogValidator { now, records, maxTimestamp, - shallowOffsetOfMaxTimestamp, false, RecordValidationStats.EMPTY); } @@ -445,7 +422,6 @@ public class LogValidator { now, records, maxTimestamp, - lastOffset, false, recordValidationStats); } @@ -487,7 +463,6 @@ public class LogValidator { logAppendTime, records, info.maxTimestamp, - info.shallowOffsetOfMaxTimestamp, true, recordValidationStats); }