mirror of https://github.com/apache/kafka.git
KAFKA-16341 fix the LogValidator for non-compressed type (#15476)
- Fix the verifying logic. If it's LOG_APPEND_TIME, we choose the offset of the first record. Else, we choose the record with the maxTimeStamp. - rename the shallowOffsetOfMaxTimestamp to offsetOfMaxTimestamp Reviewers: Jun Rao <junrao@gmail.com>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
1d6e0b8727
commit
bf3f088c94
|
@ -209,7 +209,7 @@ public class MemoryRecords extends AbstractRecords {
|
||||||
partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);
|
partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);
|
||||||
|
|
||||||
MemoryRecordsBuilder.RecordsInfo info = builder.info();
|
MemoryRecordsBuilder.RecordsInfo info = builder.info();
|
||||||
filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.shallowOffsetOfMaxTimestamp,
|
filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.offsetOfMaxTimestamp,
|
||||||
maxOffset, retainedRecords.size(), filteredBatchSize);
|
maxOffset, retainedRecords.size(), filteredBatchSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -399,7 +399,7 @@ public class MemoryRecords extends AbstractRecords {
|
||||||
private int bytesRetained = 0;
|
private int bytesRetained = 0;
|
||||||
private long maxOffset = -1L;
|
private long maxOffset = -1L;
|
||||||
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
|
private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
|
||||||
private long shallowOffsetOfMaxTimestamp = -1L;
|
private long offsetOfMaxTimestamp = -1L;
|
||||||
|
|
||||||
private FilterResult(ByteBuffer outputBuffer) {
|
private FilterResult(ByteBuffer outputBuffer) {
|
||||||
this.outputBuffer = outputBuffer;
|
this.outputBuffer = outputBuffer;
|
||||||
|
@ -411,21 +411,21 @@ public class MemoryRecords extends AbstractRecords {
|
||||||
retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained);
|
retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateRetainedBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset,
|
private void updateRetainedBatchMetadata(long maxTimestamp, long offsetOfMaxTimestamp, long maxOffset,
|
||||||
int messagesRetained, int bytesRetained) {
|
int messagesRetained, int bytesRetained) {
|
||||||
validateBatchMetadata(maxTimestamp, shallowOffsetOfMaxTimestamp, maxOffset);
|
validateBatchMetadata(maxTimestamp, offsetOfMaxTimestamp, maxOffset);
|
||||||
if (maxTimestamp > this.maxTimestamp) {
|
if (maxTimestamp > this.maxTimestamp) {
|
||||||
this.maxTimestamp = maxTimestamp;
|
this.maxTimestamp = maxTimestamp;
|
||||||
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
|
this.offsetOfMaxTimestamp = offsetOfMaxTimestamp;
|
||||||
}
|
}
|
||||||
this.maxOffset = Math.max(maxOffset, this.maxOffset);
|
this.maxOffset = Math.max(maxOffset, this.maxOffset);
|
||||||
this.messagesRetained += messagesRetained;
|
this.messagesRetained += messagesRetained;
|
||||||
this.bytesRetained += bytesRetained;
|
this.bytesRetained += bytesRetained;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset) {
|
private void validateBatchMetadata(long maxTimestamp, long offsetOfMaxTimestamp, long maxOffset) {
|
||||||
if (maxTimestamp != RecordBatch.NO_TIMESTAMP && shallowOffsetOfMaxTimestamp < 0)
|
if (maxTimestamp != RecordBatch.NO_TIMESTAMP && offsetOfMaxTimestamp < 0)
|
||||||
throw new IllegalArgumentException("shallowOffset undefined for maximum timestamp " + maxTimestamp);
|
throw new IllegalArgumentException("offset undefined for maximum timestamp " + maxTimestamp);
|
||||||
if (maxOffset < 0)
|
if (maxOffset < 0)
|
||||||
throw new IllegalArgumentException("maxOffset undefined");
|
throw new IllegalArgumentException("maxOffset undefined");
|
||||||
}
|
}
|
||||||
|
@ -458,8 +458,8 @@ public class MemoryRecords extends AbstractRecords {
|
||||||
return maxTimestamp;
|
return maxTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long shallowOffsetOfMaxTimestamp() {
|
public long offsetOfMaxTimestamp() {
|
||||||
return shallowOffsetOfMaxTimestamp;
|
return offsetOfMaxTimestamp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -851,12 +851,12 @@ public class MemoryRecordsBuilder implements AutoCloseable {
|
||||||
|
|
||||||
public static class RecordsInfo {
|
public static class RecordsInfo {
|
||||||
public final long maxTimestamp;
|
public final long maxTimestamp;
|
||||||
public final long shallowOffsetOfMaxTimestamp;
|
public final long offsetOfMaxTimestamp;
|
||||||
|
|
||||||
public RecordsInfo(long maxTimestamp,
|
public RecordsInfo(long maxTimestamp,
|
||||||
long shallowOffsetOfMaxTimestamp) {
|
long offsetOfMaxTimestamp) {
|
||||||
this.maxTimestamp = maxTimestamp;
|
this.maxTimestamp = maxTimestamp;
|
||||||
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
|
this.offsetOfMaxTimestamp = offsetOfMaxTimestamp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -378,7 +378,7 @@ public class MemoryRecordsBuilderTest {
|
||||||
MemoryRecordsBuilder.RecordsInfo info = builder.info();
|
MemoryRecordsBuilder.RecordsInfo info = builder.info();
|
||||||
assertEquals(logAppendTime, info.maxTimestamp);
|
assertEquals(logAppendTime, info.maxTimestamp);
|
||||||
// When logAppendTime is used, the first offset of the batch will be the offset of maxTimestamp
|
// When logAppendTime is used, the first offset of the batch will be the offset of maxTimestamp
|
||||||
assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
|
assertEquals(0L, info.offsetOfMaxTimestamp);
|
||||||
|
|
||||||
for (RecordBatch batch : records.batches()) {
|
for (RecordBatch batch : records.batches()) {
|
||||||
if (magic == MAGIC_VALUE_V0) {
|
if (magic == MAGIC_VALUE_V0) {
|
||||||
|
@ -414,9 +414,9 @@ public class MemoryRecordsBuilderTest {
|
||||||
|
|
||||||
if (magic == MAGIC_VALUE_V0)
|
if (magic == MAGIC_VALUE_V0)
|
||||||
// in MAGIC_VALUE_V0's case, we don't have timestamp info in records, so always return -1.
|
// in MAGIC_VALUE_V0's case, we don't have timestamp info in records, so always return -1.
|
||||||
assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
|
assertEquals(-1L, info.offsetOfMaxTimestamp);
|
||||||
else
|
else
|
||||||
assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
|
assertEquals(1L, info.offsetOfMaxTimestamp);
|
||||||
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
long[] expectedTimestamps = new long[] {0L, 2L, 1L};
|
long[] expectedTimestamps = new long[] {0L, 2L, 1L};
|
||||||
|
@ -495,10 +495,10 @@ public class MemoryRecordsBuilderTest {
|
||||||
MemoryRecordsBuilder.RecordsInfo info = builder.info();
|
MemoryRecordsBuilder.RecordsInfo info = builder.info();
|
||||||
if (magic == MAGIC_VALUE_V0) {
|
if (magic == MAGIC_VALUE_V0) {
|
||||||
assertEquals(-1, info.maxTimestamp);
|
assertEquals(-1, info.maxTimestamp);
|
||||||
assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
|
assertEquals(-1L, info.offsetOfMaxTimestamp);
|
||||||
} else {
|
} else {
|
||||||
assertEquals(2L, info.maxTimestamp);
|
assertEquals(2L, info.maxTimestamp);
|
||||||
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
|
assertEquals(2L, info.offsetOfMaxTimestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
long i = 0L;
|
long i = 0L;
|
||||||
|
|
|
@ -352,7 +352,7 @@ public class MemoryRecordsTest {
|
||||||
assertEquals(0, filterResult.messagesRetained());
|
assertEquals(0, filterResult.messagesRetained());
|
||||||
assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
|
assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
|
||||||
assertEquals(12, filterResult.maxTimestamp());
|
assertEquals(12, filterResult.maxTimestamp());
|
||||||
assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp());
|
assertEquals(baseOffset + 1, filterResult.offsetOfMaxTimestamp());
|
||||||
|
|
||||||
// Verify filtered records
|
// Verify filtered records
|
||||||
filtered.flip();
|
filtered.flip();
|
||||||
|
@ -413,7 +413,7 @@ public class MemoryRecordsTest {
|
||||||
assertEquals(0, filterResult.messagesRetained());
|
assertEquals(0, filterResult.messagesRetained());
|
||||||
assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
|
assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
|
||||||
assertEquals(timestamp, filterResult.maxTimestamp());
|
assertEquals(timestamp, filterResult.maxTimestamp());
|
||||||
assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp());
|
assertEquals(baseOffset, filterResult.offsetOfMaxTimestamp());
|
||||||
assertTrue(filterResult.outputBuffer().position() > 0);
|
assertTrue(filterResult.outputBuffer().position() > 0);
|
||||||
|
|
||||||
// Verify filtered records
|
// Verify filtered records
|
||||||
|
@ -893,7 +893,7 @@ public class MemoryRecordsTest {
|
||||||
assertEquals(filtered.limit(), result.bytesRetained());
|
assertEquals(filtered.limit(), result.bytesRetained());
|
||||||
if (magic > RecordBatch.MAGIC_VALUE_V0) {
|
if (magic > RecordBatch.MAGIC_VALUE_V0) {
|
||||||
assertEquals(20L, result.maxTimestamp());
|
assertEquals(20L, result.maxTimestamp());
|
||||||
assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
|
assertEquals(4L, result.offsetOfMaxTimestamp());
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
|
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
|
||||||
|
|
|
@ -406,8 +406,8 @@ class LocalLog(@volatile private var _dir: File,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
|
private[log] def append(lastOffset: Long, largestTimestamp: Long, offsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
|
||||||
segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)
|
segments.activeSegment.append(lastOffset, largestTimestamp, offsetOfMaxTimestamp, records)
|
||||||
updateLogEndOffset(lastOffset + 1)
|
updateLogEndOffset(lastOffset + 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -812,7 +812,7 @@ private[log] class Cleaner(val id: Int,
|
||||||
val retained = MemoryRecords.readableRecords(outputBuffer)
|
val retained = MemoryRecords.readableRecords(outputBuffer)
|
||||||
// it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
|
// it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
|
||||||
// after `Log.replaceSegments` (which acquires the lock) is called
|
// after `Log.replaceSegments` (which acquires the lock) is called
|
||||||
dest.append(result.maxOffset, result.maxTimestamp, result.shallowOffsetOfMaxTimestamp, retained)
|
dest.append(result.maxOffset, result.maxTimestamp, result.offsetOfMaxTimestamp, retained)
|
||||||
throttler.maybeThrottle(outputBuffer.limit())
|
throttler.maybeThrottle(outputBuffer.limit())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -819,7 +819,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
|
||||||
|
|
||||||
validRecords = validateAndOffsetAssignResult.validatedRecords
|
validRecords = validateAndOffsetAssignResult.validatedRecords
|
||||||
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
|
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
|
||||||
appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs)
|
appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.offsetOfMaxTimestampMs)
|
||||||
appendInfo.setLastOffset(offset.value - 1)
|
appendInfo.setLastOffset(offset.value - 1)
|
||||||
appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
|
appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
|
||||||
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
|
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
|
||||||
|
|
|
@ -68,9 +68,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
|
||||||
verifyListOffsets()
|
verifyListOffsets()
|
||||||
|
|
||||||
// test LogAppendTime case
|
// test LogAppendTime case
|
||||||
val props: Properties = new Properties()
|
setUpForLogAppendTimeCase()
|
||||||
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
|
|
||||||
createTopicWithConfig(topicNameWithCustomConfigs, props)
|
|
||||||
produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
|
produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
|
||||||
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
||||||
// So in this one batch test, it'll be the first offset 0
|
// So in this one batch test, it'll be the first offset 0
|
||||||
|
@ -79,9 +77,30 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
@ValueSource(strings = Array("zk", "kraft"))
|
@ValueSource(strings = Array("zk", "kraft"))
|
||||||
def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
|
def testThreeNonCompressedRecordsInOneBatch(quorum: String): Unit = {
|
||||||
|
produceMessagesInOneBatch()
|
||||||
|
verifyListOffsets()
|
||||||
|
|
||||||
|
// test LogAppendTime case
|
||||||
|
setUpForLogAppendTimeCase()
|
||||||
|
produceMessagesInOneBatch(topic=topicNameWithCustomConfigs)
|
||||||
|
// In LogAppendTime's case, if the timestamps are the same, we choose the offset of the first record
|
||||||
|
// thus, the maxTimestampOffset should be the first record of the batch.
|
||||||
|
// So in this one batch test, it'll be the first offset which is 0
|
||||||
|
verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||||
|
@ValueSource(strings = Array("zk", "kraft"))
|
||||||
|
def testThreeNonCompressedRecordsInSeparateBatch(quorum: String): Unit = {
|
||||||
produceMessagesInSeparateBatch()
|
produceMessagesInSeparateBatch()
|
||||||
verifyListOffsets()
|
verifyListOffsets()
|
||||||
|
|
||||||
|
// test LogAppendTime case
|
||||||
|
setUpForLogAppendTimeCase()
|
||||||
|
produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
|
||||||
|
// In LogAppendTime's case, if the timestamp is different, it should be the last one
|
||||||
|
verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0
|
// The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0
|
||||||
|
@ -93,9 +112,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
|
||||||
verifyListOffsets()
|
verifyListOffsets()
|
||||||
|
|
||||||
// test LogAppendTime case
|
// test LogAppendTime case
|
||||||
val props: Properties = new Properties()
|
setUpForLogAppendTimeCase()
|
||||||
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
|
|
||||||
createTopicWithConfig(topicNameWithCustomConfigs, props)
|
|
||||||
produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
|
produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
|
||||||
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
||||||
// So in this one batch test, it'll be the first offset 0
|
// So in this one batch test, it'll be the first offset 0
|
||||||
|
@ -111,9 +128,7 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
|
||||||
verifyListOffsets()
|
verifyListOffsets()
|
||||||
|
|
||||||
// test LogAppendTime case
|
// test LogAppendTime case
|
||||||
val props: Properties = new Properties()
|
setUpForLogAppendTimeCase()
|
||||||
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
|
|
||||||
createTopicWithConfig(topicNameWithCustomConfigs, props)
|
|
||||||
produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
|
produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
|
||||||
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
||||||
// So in this separate batch test, it'll be the last offset 2
|
// So in this separate batch test, it'll be the last offset 2
|
||||||
|
@ -147,15 +162,19 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
|
||||||
verifyListOffsets()
|
verifyListOffsets()
|
||||||
|
|
||||||
// test LogAppendTime case
|
// test LogAppendTime case
|
||||||
val props: Properties = new Properties()
|
setUpForLogAppendTimeCase()
|
||||||
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
|
|
||||||
createTopicWithConfig(topicNameWithCustomConfigs, props)
|
|
||||||
produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs)
|
produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs)
|
||||||
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
||||||
// So in this separate batch test, it'll be the last offset 2
|
// So in this separate batch test, it'll be the last offset 2
|
||||||
verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
|
verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def setUpForLogAppendTimeCase(): Unit = {
|
||||||
|
val props: Properties = new Properties()
|
||||||
|
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
|
||||||
|
createTopicWithConfig(topicNameWithCustomConfigs, props)
|
||||||
|
}
|
||||||
|
|
||||||
private def createOldMessageFormatBrokers(): Unit = {
|
private def createOldMessageFormatBrokers(): Unit = {
|
||||||
setOldMessageFormat = true
|
setOldMessageFormat = true
|
||||||
recreateBrokers(reconfigure = true, startup = true)
|
recreateBrokers(reconfigure = true, startup = true)
|
||||||
|
|
|
@ -100,7 +100,7 @@ class LocalLogTest {
|
||||||
initialOffset: Long = 0L): Unit = {
|
initialOffset: Long = 0L): Unit = {
|
||||||
log.append(lastOffset = initialOffset + records.size - 1,
|
log.append(lastOffset = initialOffset + records.size - 1,
|
||||||
largestTimestamp = records.head.timestamp,
|
largestTimestamp = records.head.timestamp,
|
||||||
shallowOffsetOfMaxTimestamp = initialOffset,
|
offsetOfMaxTimestamp = initialOffset,
|
||||||
records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 0, records.toList : _*))
|
records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 0, records.toList : _*))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,10 +86,10 @@ class LogSegmentTest {
|
||||||
def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = {
|
def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = {
|
||||||
val seg = createSegment(baseOffset)
|
val seg = createSegment(baseOffset)
|
||||||
val currentTime = Time.SYSTEM.milliseconds()
|
val currentTime = Time.SYSTEM.milliseconds()
|
||||||
val shallowOffsetOfMaxTimestamp = largestOffset
|
val offsetOfMaxTimestamp = largestOffset
|
||||||
val memoryRecords = records(0, "hello")
|
val memoryRecords = records(0, "hello")
|
||||||
assertThrows(classOf[LogSegmentOffsetOverflowException], () => {
|
assertThrows(classOf[LogSegmentOffsetOverflowException], () => {
|
||||||
seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords)
|
seg.append(largestOffset, currentTime, offsetOfMaxTimestamp, memoryRecords)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -173,9 +173,9 @@ class LogValidatorTest {
|
||||||
assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now")
|
assertEquals(now, validatedResults.maxTimestampMs, s"Max timestamp should be $now")
|
||||||
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
|
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
|
||||||
|
|
||||||
// we index from last offset in version 2 instead of base offset
|
// If it's LOG_APPEND_TIME, the offset will be the offset of the first record
|
||||||
val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 2 else 0
|
val expectedMaxTimestampOffset = 0
|
||||||
assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestampMs,
|
assertEquals(expectedMaxTimestampOffset, validatedResults.offsetOfMaxTimestampMs,
|
||||||
s"The offset of max timestamp should be $expectedMaxTimestampOffset")
|
s"The offset of max timestamp should be $expectedMaxTimestampOffset")
|
||||||
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records,
|
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records,
|
||||||
compressed = false)
|
compressed = false)
|
||||||
|
@ -219,7 +219,7 @@ class LogValidatorTest {
|
||||||
"MessageSet should still valid")
|
"MessageSet should still valid")
|
||||||
assertEquals(now, validatedResults.maxTimestampMs,
|
assertEquals(now, validatedResults.maxTimestampMs,
|
||||||
s"Max timestamp should be $now")
|
s"Max timestamp should be $now")
|
||||||
assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
|
assertEquals(0, validatedResults.offsetOfMaxTimestampMs,
|
||||||
s"The offset of max timestamp should be 0 if logAppendTime is used")
|
s"The offset of max timestamp should be 0 if logAppendTime is used")
|
||||||
assertTrue(validatedResults.messageSizeMaybeChanged,
|
assertTrue(validatedResults.messageSizeMaybeChanged,
|
||||||
"Message size may have been changed")
|
"Message size may have been changed")
|
||||||
|
@ -271,7 +271,7 @@ class LogValidatorTest {
|
||||||
"MessageSet should still valid")
|
"MessageSet should still valid")
|
||||||
assertEquals(now, validatedResults.maxTimestampMs,
|
assertEquals(now, validatedResults.maxTimestampMs,
|
||||||
s"Max timestamp should be $now")
|
s"Max timestamp should be $now")
|
||||||
assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
|
assertEquals(0, validatedResults.offsetOfMaxTimestampMs,
|
||||||
s"The offset of max timestamp should be 0 if logAppendTime is used")
|
s"The offset of max timestamp should be 0 if logAppendTime is used")
|
||||||
assertFalse(validatedResults.messageSizeMaybeChanged,
|
assertFalse(validatedResults.messageSizeMaybeChanged,
|
||||||
"Message size should not have been changed")
|
"Message size should not have been changed")
|
||||||
|
@ -404,14 +404,8 @@ class LogValidatorTest {
|
||||||
assertEquals(now + 1, validatingResults.maxTimestampMs,
|
assertEquals(now + 1, validatingResults.maxTimestampMs,
|
||||||
s"Max timestamp should be ${now + 1}")
|
s"Max timestamp should be ${now + 1}")
|
||||||
|
|
||||||
val expectedShallowOffsetOfMaxTimestamp = if (magic >= RecordVersion.V2.value) {
|
val expectedOffsetOfMaxTimestamp = 1
|
||||||
// v2 records are always batched, even when not compressed.
|
assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs,
|
||||||
// the shallow offset of max timestamp is the last offset of the batch
|
|
||||||
recordList.size - 1
|
|
||||||
} else {
|
|
||||||
1
|
|
||||||
}
|
|
||||||
assertEquals(expectedShallowOffsetOfMaxTimestamp, validatingResults.shallowOffsetOfMaxTimestampMs,
|
|
||||||
s"Offset of max timestamp should be 1")
|
s"Offset of max timestamp should be 1")
|
||||||
|
|
||||||
assertFalse(validatingResults.messageSizeMaybeChanged,
|
assertFalse(validatingResults.messageSizeMaybeChanged,
|
||||||
|
@ -486,7 +480,7 @@ class LogValidatorTest {
|
||||||
}
|
}
|
||||||
assertEquals(now + 1, validatingResults.maxTimestampMs,
|
assertEquals(now + 1, validatingResults.maxTimestampMs,
|
||||||
s"Max timestamp should be ${now + 1}")
|
s"Max timestamp should be ${now + 1}")
|
||||||
assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs,
|
assertEquals(1, validatingResults.offsetOfMaxTimestampMs,
|
||||||
"Offset of max timestamp should be 1")
|
"Offset of max timestamp should be 1")
|
||||||
assertTrue(validatingResults.messageSizeMaybeChanged,
|
assertTrue(validatingResults.messageSizeMaybeChanged,
|
||||||
"Message size should have been changed")
|
"Message size should have been changed")
|
||||||
|
@ -538,7 +532,7 @@ class LogValidatorTest {
|
||||||
}
|
}
|
||||||
assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP,
|
assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP,
|
||||||
s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}")
|
s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}")
|
||||||
assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs,
|
assertEquals(-1, validatedResults.offsetOfMaxTimestampMs,
|
||||||
s"Offset of max timestamp should be -1")
|
s"Offset of max timestamp should be -1")
|
||||||
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
|
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
|
||||||
|
|
||||||
|
@ -585,7 +579,7 @@ class LogValidatorTest {
|
||||||
assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence)
|
assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence)
|
||||||
}
|
}
|
||||||
assertEquals(timestamp, validatedResults.maxTimestampMs)
|
assertEquals(timestamp, validatedResults.maxTimestampMs)
|
||||||
assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
|
assertEquals(0, validatedResults.offsetOfMaxTimestampMs,
|
||||||
s"Offset of max timestamp should be 0 when multiple records having the same max timestamp.")
|
s"Offset of max timestamp should be 0 when multiple records having the same max timestamp.")
|
||||||
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
|
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
|
||||||
|
|
||||||
|
@ -657,8 +651,8 @@ class LogValidatorTest {
|
||||||
}
|
}
|
||||||
assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}")
|
assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}")
|
||||||
|
|
||||||
val expectedShallowOffsetOfMaxTimestamp = 1
|
val expectedOffsetOfMaxTimestamp = 1
|
||||||
assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestampMs,
|
assertEquals(expectedOffsetOfMaxTimestamp, validatedResults.offsetOfMaxTimestampMs,
|
||||||
s"Offset of max timestamp should be 1")
|
s"Offset of max timestamp should be 1")
|
||||||
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
|
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
|
||||||
|
|
||||||
|
|
|
@ -232,17 +232,17 @@ public class LogSegment implements Closeable {
|
||||||
*
|
*
|
||||||
* @param largestOffset The last offset in the message set
|
* @param largestOffset The last offset in the message set
|
||||||
* @param largestTimestampMs The largest timestamp in the message set.
|
* @param largestTimestampMs The largest timestamp in the message set.
|
||||||
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
|
* @param offsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
|
||||||
* @param records The log entries to append.
|
* @param records The log entries to append.
|
||||||
* @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
|
* @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
|
||||||
*/
|
*/
|
||||||
public void append(long largestOffset,
|
public void append(long largestOffset,
|
||||||
long largestTimestampMs,
|
long largestTimestampMs,
|
||||||
long shallowOffsetOfMaxTimestamp,
|
long offsetOfMaxTimestamp,
|
||||||
MemoryRecords records) throws IOException {
|
MemoryRecords records) throws IOException {
|
||||||
if (records.sizeInBytes() > 0) {
|
if (records.sizeInBytes() > 0) {
|
||||||
LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at shallow offset {}",
|
LOGGER.trace("Inserting {} bytes at end offset {} at position {} with largest timestamp {} at offset {}",
|
||||||
records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, shallowOffsetOfMaxTimestamp);
|
records.sizeInBytes(), largestOffset, log.sizeInBytes(), largestTimestampMs, offsetOfMaxTimestamp);
|
||||||
int physicalPosition = log.sizeInBytes();
|
int physicalPosition = log.sizeInBytes();
|
||||||
if (physicalPosition == 0)
|
if (physicalPosition == 0)
|
||||||
rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);
|
rollingBasedTimestamp = OptionalLong.of(largestTimestampMs);
|
||||||
|
@ -254,7 +254,7 @@ public class LogSegment implements Closeable {
|
||||||
LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset);
|
LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset);
|
||||||
// Update the in memory max timestamp and corresponding offset.
|
// Update the in memory max timestamp and corresponding offset.
|
||||||
if (largestTimestampMs > maxTimestampSoFar()) {
|
if (largestTimestampMs > maxTimestampSoFar()) {
|
||||||
maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
|
maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, offsetOfMaxTimestamp);
|
||||||
}
|
}
|
||||||
// append an entry to the index (if needed)
|
// append an entry to the index (if needed)
|
||||||
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
|
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
|
||||||
|
|
|
@ -68,17 +68,17 @@ public class LogValidator {
|
||||||
public final long logAppendTimeMs;
|
public final long logAppendTimeMs;
|
||||||
public final MemoryRecords validatedRecords;
|
public final MemoryRecords validatedRecords;
|
||||||
public final long maxTimestampMs;
|
public final long maxTimestampMs;
|
||||||
public final long shallowOffsetOfMaxTimestampMs;
|
public final long offsetOfMaxTimestampMs;
|
||||||
public final boolean messageSizeMaybeChanged;
|
public final boolean messageSizeMaybeChanged;
|
||||||
public final RecordValidationStats recordValidationStats;
|
public final RecordValidationStats recordValidationStats;
|
||||||
|
|
||||||
public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs,
|
public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs,
|
||||||
long shallowOffsetOfMaxTimestampMs, boolean messageSizeMaybeChanged,
|
long offsetOfMaxTimestampMs, boolean messageSizeMaybeChanged,
|
||||||
RecordValidationStats recordValidationStats) {
|
RecordValidationStats recordValidationStats) {
|
||||||
this.logAppendTimeMs = logAppendTimeMs;
|
this.logAppendTimeMs = logAppendTimeMs;
|
||||||
this.validatedRecords = validatedRecords;
|
this.validatedRecords = validatedRecords;
|
||||||
this.maxTimestampMs = maxTimestampMs;
|
this.maxTimestampMs = maxTimestampMs;
|
||||||
this.shallowOffsetOfMaxTimestampMs = shallowOffsetOfMaxTimestampMs;
|
this.offsetOfMaxTimestampMs = offsetOfMaxTimestampMs;
|
||||||
this.messageSizeMaybeChanged = messageSizeMaybeChanged;
|
this.messageSizeMaybeChanged = messageSizeMaybeChanged;
|
||||||
this.recordValidationStats = recordValidationStats;
|
this.recordValidationStats = recordValidationStats;
|
||||||
}
|
}
|
||||||
|
@ -149,7 +149,7 @@ public class LogValidator {
|
||||||
* avoid expensive re-compression.
|
* avoid expensive re-compression.
|
||||||
*
|
*
|
||||||
* Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset
|
* Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset
|
||||||
* of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed.
|
* of the message with the max timestamp and a boolean indicating whether the message sizes may have changed.
|
||||||
*/
|
*/
|
||||||
public ValidationResult validateMessagesAndAssignOffsets(PrimitiveRef.LongRef offsetCounter,
|
public ValidationResult validateMessagesAndAssignOffsets(PrimitiveRef.LongRef offsetCounter,
|
||||||
MetricsRecorder metricsRecorder,
|
MetricsRecorder metricsRecorder,
|
||||||
|
@ -232,7 +232,7 @@ public class LogValidator {
|
||||||
now,
|
now,
|
||||||
convertedRecords,
|
convertedRecords,
|
||||||
info.maxTimestamp,
|
info.maxTimestamp,
|
||||||
info.shallowOffsetOfMaxTimestamp,
|
info.offsetOfMaxTimestamp,
|
||||||
true,
|
true,
|
||||||
recordValidationStats);
|
recordValidationStats);
|
||||||
}
|
}
|
||||||
|
@ -296,10 +296,6 @@ public class LogValidator {
|
||||||
offsetOfMaxTimestamp = initialOffset;
|
offsetOfMaxTimestamp = initialOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
|
|
||||||
offsetOfMaxTimestamp = offsetCounter.value - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return new ValidationResult(
|
return new ValidationResult(
|
||||||
now,
|
now,
|
||||||
records,
|
records,
|
||||||
|
@ -480,7 +476,7 @@ public class LogValidator {
|
||||||
logAppendTime,
|
logAppendTime,
|
||||||
records,
|
records,
|
||||||
info.maxTimestamp,
|
info.maxTimestamp,
|
||||||
info.shallowOffsetOfMaxTimestamp,
|
info.offsetOfMaxTimestamp,
|
||||||
true,
|
true,
|
||||||
recordValidationStats);
|
recordValidationStats);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue