mirror of https://github.com/apache/kafka.git
KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records (#15542)
Backported #15474 to v3.6 branch. Since there is more code diff, I'd like to make sure the backport doesn't break any tests. Fix getOffsetByMaxTimestamp for compressed records. This PR adds: For inPlaceAssignment case, compute the correct offset for maxTimestamp when traversing the batch records, and set to ValidationResult in the end, instead of setting to last offset always. 2. For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log create time, like non-compressed, and inPlaceAssignment cases, instead of setting to last offset always. 3. Add tests to verify the fix. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
bbd273b70c
commit
8aa39869aa
|
|
@ -242,34 +242,23 @@ public class MemoryRecordsBuilder implements AutoCloseable {
|
|||
|
||||
/**
|
||||
* Get the max timestamp and its offset. The details of the offset returned are a bit subtle.
|
||||
* Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp.
|
||||
*
|
||||
* If the log append time is used, the offset will be the last offset unless no compression is used and
|
||||
* the message format version is 0 or 1, in which case, it will be the first offset.
|
||||
* If the log append time is used, the offset will be the first offset of the record.
|
||||
*
|
||||
* If create time is used, the offset will be the last offset unless no compression is used and the message
|
||||
* format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp.
|
||||
* If create time is used, the offset will always be the offset of the record with the max timestamp.
|
||||
*
|
||||
* If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records.
|
||||
*
|
||||
* @return The max timestamp and its offset
|
||||
*/
|
||||
public RecordsInfo info() {
|
||||
if (timestampType == TimestampType.LOG_APPEND_TIME) {
|
||||
long shallowOffsetOfMaxTimestamp;
|
||||
// Use the last offset when dealing with record batches
|
||||
if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2)
|
||||
shallowOffsetOfMaxTimestamp = lastOffset;
|
||||
else
|
||||
shallowOffsetOfMaxTimestamp = baseOffset;
|
||||
return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
|
||||
} else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
|
||||
return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
|
||||
return new RecordsInfo(logAppendTime, baseOffset);
|
||||
} else {
|
||||
long shallowOffsetOfMaxTimestamp;
|
||||
// Use the last offset when dealing with record batches
|
||||
if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2)
|
||||
shallowOffsetOfMaxTimestamp = lastOffset;
|
||||
else
|
||||
shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
|
||||
return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
|
||||
// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping
|
||||
// If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1]
|
||||
return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -378,10 +378,8 @@ public class MemoryRecordsBuilderTest {
|
|||
MemoryRecordsBuilder.RecordsInfo info = builder.info();
|
||||
assertEquals(logAppendTime, info.maxTimestamp);
|
||||
|
||||
if (args.compressionType == CompressionType.NONE && magic <= MAGIC_VALUE_V1)
|
||||
assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
|
||||
else
|
||||
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
|
||||
// When logAppendTime is used, the first offset of the batch will be the offset of maxTimestamp
|
||||
assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
|
||||
|
||||
for (RecordBatch batch : records.batches()) {
|
||||
if (magic == MAGIC_VALUE_V0) {
|
||||
|
|
@ -415,10 +413,11 @@ public class MemoryRecordsBuilderTest {
|
|||
assertEquals(2L, info.maxTimestamp);
|
||||
}
|
||||
|
||||
if (args.compressionType == CompressionType.NONE && magic == MAGIC_VALUE_V1)
|
||||
assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
|
||||
if (magic == MAGIC_VALUE_V0)
|
||||
// in MAGIC_VALUE_V0's case, we don't have timestamp info in records, so always return -1.
|
||||
assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
|
||||
else
|
||||
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
|
||||
assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
|
||||
|
||||
int i = 0;
|
||||
long[] expectedTimestamps = new long[] {0L, 2L, 1L};
|
||||
|
|
@ -495,12 +494,13 @@ public class MemoryRecordsBuilderTest {
|
|||
MemoryRecords records = builder.build();
|
||||
|
||||
MemoryRecordsBuilder.RecordsInfo info = builder.info();
|
||||
if (magic == MAGIC_VALUE_V0)
|
||||
if (magic == MAGIC_VALUE_V0) {
|
||||
assertEquals(-1, info.maxTimestamp);
|
||||
else
|
||||
assertEquals(-1L, info.shallowOffsetOfMaxTimestamp);
|
||||
} else {
|
||||
assertEquals(2L, info.maxTimestamp);
|
||||
|
||||
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
|
||||
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
|
||||
}
|
||||
|
||||
long i = 0L;
|
||||
for (RecordBatch batch : records.batches()) {
|
||||
|
|
|
|||
|
|
@ -893,10 +893,7 @@ public class MemoryRecordsTest {
|
|||
assertEquals(filtered.limit(), result.bytesRetained());
|
||||
if (magic > RecordBatch.MAGIC_VALUE_V0) {
|
||||
assertEquals(20L, result.maxTimestamp());
|
||||
if (compression == CompressionType.NONE && magic < RecordBatch.MAGIC_VALUE_V2)
|
||||
assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
|
||||
else
|
||||
assertEquals(5L, result.shallowOffsetOfMaxTimestamp());
|
||||
assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
|
||||
}
|
||||
|
||||
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
|
||||
|
|
|
|||
|
|
@ -19,77 +19,235 @@ package kafka.admin
|
|||
|
||||
import kafka.integration.KafkaServerTestHarness
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.TestUtils
|
||||
import kafka.utils.TestUtils.{createProducer, plaintextBootstrapServers}
|
||||
import kafka.utils.{TestInfoUtils, TestUtils}
|
||||
import org.apache.kafka.clients.admin._
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
import org.apache.kafka.common.config.TopicConfig
|
||||
import org.apache.kafka.common.utils.{MockTime, Time, Utils}
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
|
||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
|
||||
import org.junit.jupiter.params.ParameterizedTest
|
||||
import org.junit.jupiter.params.provider.ValueSource
|
||||
|
||||
import java.util.Properties
|
||||
import scala.collection.{Map, Seq}
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
||||
class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
|
||||
|
||||
val topicName = "foo"
|
||||
val topicNameWithCustomConfigs = "foo2"
|
||||
var adminClient: Admin = _
|
||||
var setOldMessageFormat: Boolean = false
|
||||
val mockTime: Time = new MockTime(1)
|
||||
|
||||
@BeforeEach
|
||||
override def setUp(testInfo: TestInfo): Unit = {
|
||||
super.setUp(testInfo)
|
||||
createTopic(topicName, 1, 1.toShort)
|
||||
produceMessages()
|
||||
createTopicWithConfig(topicName, new Properties())
|
||||
adminClient = Admin.create(Map[String, Object](
|
||||
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
|
||||
).asJava)
|
||||
}
|
||||
|
||||
override def brokerTime(brokerId: Int): Time = mockTime
|
||||
|
||||
@AfterEach
|
||||
override def tearDown(): Unit = {
|
||||
setOldMessageFormat = false
|
||||
Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
|
||||
super.tearDown()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testEarliestOffset(): Unit = {
|
||||
val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("zk", "kraft"))
|
||||
def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
|
||||
produceMessagesInOneBatch("gzip")
|
||||
verifyListOffsets()
|
||||
|
||||
// test LogAppendTime case
|
||||
val props: Properties = new Properties()
|
||||
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
|
||||
createTopicWithConfig(topicNameWithCustomConfigs, props)
|
||||
produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
|
||||
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
||||
// So in this one batch test, it'll be the first offset 0
|
||||
verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("zk", "kraft"))
|
||||
def testThreeRecordsInSeparateBatch(quorum: String): Unit = {
|
||||
produceMessagesInSeparateBatch()
|
||||
verifyListOffsets()
|
||||
}
|
||||
|
||||
// The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("zk"))
|
||||
def testThreeRecordsInOneBatchWithMessageConversion(quorum: String): Unit = {
|
||||
createOldMessageFormatBrokers()
|
||||
produceMessagesInOneBatch()
|
||||
verifyListOffsets()
|
||||
|
||||
// test LogAppendTime case
|
||||
val props: Properties = new Properties()
|
||||
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
|
||||
createTopicWithConfig(topicNameWithCustomConfigs, props)
|
||||
produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
|
||||
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
||||
// So in this one batch test, it'll be the first offset 0
|
||||
verifyListOffsets(topic = topicNameWithCustomConfigs, 0)
|
||||
}
|
||||
|
||||
// The message conversion test only run in ZK mode because KRaft mode doesn't support "inter.broker.protocol.version" < 3.0
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("zk"))
|
||||
def testThreeRecordsInSeparateBatchWithMessageConversion(quorum: String): Unit = {
|
||||
createOldMessageFormatBrokers()
|
||||
produceMessagesInSeparateBatch()
|
||||
verifyListOffsets()
|
||||
|
||||
// test LogAppendTime case
|
||||
val props: Properties = new Properties()
|
||||
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
|
||||
createTopicWithConfig(topicNameWithCustomConfigs, props)
|
||||
produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
|
||||
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
||||
// So in this separate batch test, it'll be the last offset 2
|
||||
verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("zk", "kraft"))
|
||||
def testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer(quorum: String): Unit = {
|
||||
val props: Properties = new Properties()
|
||||
props.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
|
||||
createTopicWithConfig(topicNameWithCustomConfigs, props)
|
||||
produceMessagesInOneBatch(topic = topicNameWithCustomConfigs)
|
||||
verifyListOffsets(topic = topicNameWithCustomConfigs)
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("zk", "kraft"))
|
||||
def testThreeRecordsInSeparateBatchHavingDifferentCompressionTypeWithServer(quorum: String): Unit = {
|
||||
val props: Properties = new Properties()
|
||||
props.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
|
||||
createTopicWithConfig(topicNameWithCustomConfigs, props)
|
||||
produceMessagesInSeparateBatch(topic = topicNameWithCustomConfigs)
|
||||
verifyListOffsets(topic = topicNameWithCustomConfigs)
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
|
||||
@ValueSource(strings = Array("zk", "kraft"))
|
||||
def testThreeCompressedRecordsInSeparateBatch(quorum: String): Unit = {
|
||||
produceMessagesInSeparateBatch("gzip")
|
||||
verifyListOffsets()
|
||||
|
||||
// test LogAppendTime case
|
||||
val props: Properties = new Properties()
|
||||
props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
|
||||
createTopicWithConfig(topicNameWithCustomConfigs, props)
|
||||
produceMessagesInSeparateBatch("gzip", topicNameWithCustomConfigs)
|
||||
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
|
||||
// So in this separate batch test, it'll be the last offset 2
|
||||
verifyListOffsets(topic = topicNameWithCustomConfigs, 2)
|
||||
}
|
||||
|
||||
private def createOldMessageFormatBrokers(): Unit = {
|
||||
setOldMessageFormat = true
|
||||
recreateBrokers(reconfigure = true, startup = true)
|
||||
Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
|
||||
adminClient = Admin.create(Map[String, Object](
|
||||
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
|
||||
).asJava)
|
||||
}
|
||||
|
||||
private def createTopicWithConfig(topic: String, props: Properties): Unit = {
|
||||
createTopic(topic, 1, 1.toShort, topicConfig = props)
|
||||
}
|
||||
|
||||
private def verifyListOffsets(topic: String = topicName, expectedMaxTimestampOffset: Int = 1): Unit = {
|
||||
val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic)
|
||||
assertEquals(0, earliestOffset.offset())
|
||||
}
|
||||
|
||||
@Test
|
||||
def testLatestOffset(): Unit = {
|
||||
val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
|
||||
val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic)
|
||||
assertEquals(3, latestOffset.offset())
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMaxTimestampOffset(): Unit = {
|
||||
val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp())
|
||||
assertEquals(1, maxTimestampOffset.offset())
|
||||
val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic)
|
||||
assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
|
||||
}
|
||||
|
||||
private def runFetchOffsets(adminClient: Admin,
|
||||
offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = {
|
||||
val tp = new TopicPartition(topicName, 0)
|
||||
offsetSpec: OffsetSpec,
|
||||
topic: String): ListOffsetsResult.ListOffsetsResultInfo = {
|
||||
val tp = new TopicPartition(topic, 0)
|
||||
adminClient.listOffsets(Map(
|
||||
tp -> offsetSpec
|
||||
).asJava, new ListOffsetsOptions()).all().get().get(tp)
|
||||
}
|
||||
|
||||
def produceMessages(): Unit = {
|
||||
private def produceMessagesInOneBatch(compressionType: String = "none", topic: String = topicName): Unit = {
|
||||
val records = Seq(
|
||||
new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L,
|
||||
null, new Array[Byte](10000)),
|
||||
new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L,
|
||||
null, new Array[Byte](10000)),
|
||||
new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
|
||||
null, new Array[Byte](10000)),
|
||||
new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 100L,
|
||||
null, new Array[Byte](10)),
|
||||
new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 999L,
|
||||
null, new Array[Byte](10)),
|
||||
new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 200L,
|
||||
null, new Array[Byte](10)),
|
||||
)
|
||||
TestUtils.produceMessages(servers, records, -1)
|
||||
// create a producer with large linger.ms and enough batch.size (default is enough for three 10 bytes records),
|
||||
// so that we can confirm all records will be accumulated in producer until we flush them into one batch.
|
||||
val producer = createProducer(
|
||||
plaintextBootstrapServers(brokers),
|
||||
deliveryTimeoutMs = Int.MaxValue,
|
||||
lingerMs = Int.MaxValue,
|
||||
compressionType = compressionType)
|
||||
|
||||
try {
|
||||
val futures = records.map(producer.send)
|
||||
producer.flush()
|
||||
futures.foreach(_.get)
|
||||
} finally {
|
||||
producer.close()
|
||||
}
|
||||
}
|
||||
|
||||
def generateConfigs: Seq[KafkaConfig] =
|
||||
TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
|
||||
}
|
||||
private def produceMessagesInSeparateBatch(compressionType: String = "none", topic: String = topicName): Unit = {
|
||||
val records = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 100L,
|
||||
null, new Array[Byte](10)))
|
||||
val records2 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 999L,
|
||||
null, new Array[Byte](10)))
|
||||
val records3 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, 200L,
|
||||
null, new Array[Byte](10)))
|
||||
|
||||
val producer = createProducer(
|
||||
plaintextBootstrapServers(brokers),
|
||||
compressionType = compressionType)
|
||||
try {
|
||||
val futures = records.map(producer.send)
|
||||
futures.foreach(_.get)
|
||||
// advance the server time after each record sent to make sure the time changed when appendTime is used
|
||||
mockTime.sleep(100)
|
||||
val futures2 = records2.map(producer.send)
|
||||
futures2.foreach(_.get)
|
||||
mockTime.sleep(100)
|
||||
val futures3 = records3.map(producer.send)
|
||||
futures3.foreach(_.get)
|
||||
} finally {
|
||||
producer.close()
|
||||
}
|
||||
}
|
||||
|
||||
def generateConfigs: Seq[KafkaConfig] = {
|
||||
TestUtils.createBrokerConfigs(1, zkConnectOrNull).map{ props =>
|
||||
if (setOldMessageFormat) {
|
||||
props.setProperty("log.message.format.version", "0.10.0")
|
||||
props.setProperty("inter.broker.protocol.version", "0.10.0")
|
||||
}
|
||||
props
|
||||
}.map(KafkaConfig.fromProps)
|
||||
}
|
||||
}
|
||||
|
|
@ -98,7 +98,7 @@ class KRaftQuorumImplementation(
|
|||
): KafkaBroker = {
|
||||
val sharedServer = new SharedServer(config,
|
||||
new MetaProperties(clusterId, config.nodeId),
|
||||
Time.SYSTEM,
|
||||
time,
|
||||
new Metrics(),
|
||||
controllerQuorumVotersFuture,
|
||||
faultHandlerFactory)
|
||||
|
|
|
|||
|
|
@ -219,8 +219,8 @@ class LogValidatorTest {
|
|||
"MessageSet should still valid")
|
||||
assertEquals(now, validatedResults.maxTimestampMs,
|
||||
s"Max timestamp should be $now")
|
||||
assertEquals(records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs,
|
||||
s"The offset of max timestamp should be ${records.records.asScala.size - 1}")
|
||||
assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
|
||||
s"The offset of max timestamp should be 0 if logAppendTime is used")
|
||||
assertTrue(validatedResults.messageSizeMaybeChanged,
|
||||
"Message size may have been changed")
|
||||
|
||||
|
|
@ -271,8 +271,8 @@ class LogValidatorTest {
|
|||
"MessageSet should still valid")
|
||||
assertEquals(now, validatedResults.maxTimestampMs,
|
||||
s"Max timestamp should be $now")
|
||||
assertEquals(records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs,
|
||||
s"The offset of max timestamp should be ${records.records.asScala.size - 1}")
|
||||
assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
|
||||
s"The offset of max timestamp should be 0 if logAppendTime is used")
|
||||
assertFalse(validatedResults.messageSizeMaybeChanged,
|
||||
"Message size should not have been changed")
|
||||
|
||||
|
|
@ -341,6 +341,7 @@ class LogValidatorTest {
|
|||
|
||||
private def checkNonCompressed(magic: Byte): Unit = {
|
||||
val now = System.currentTimeMillis()
|
||||
// set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
|
||||
val timestampSeq = Seq(now - 1, now + 1, now)
|
||||
|
||||
val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) =
|
||||
|
|
@ -420,6 +421,7 @@ class LogValidatorTest {
|
|||
|
||||
private def checkRecompression(magic: Byte): Unit = {
|
||||
val now = System.currentTimeMillis()
|
||||
// set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
|
||||
val timestampSeq = Seq(now - 1, now + 1, now)
|
||||
|
||||
val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) =
|
||||
|
|
@ -473,8 +475,8 @@ class LogValidatorTest {
|
|||
}
|
||||
assertEquals(now + 1, validatingResults.maxTimestampMs,
|
||||
s"Max timestamp should be ${now + 1}")
|
||||
assertEquals(2, validatingResults.shallowOffsetOfMaxTimestampMs,
|
||||
"Offset of max timestamp should be 2")
|
||||
assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs,
|
||||
"Offset of max timestamp should be 1")
|
||||
assertTrue(validatingResults.messageSizeMaybeChanged,
|
||||
"Message size should have been changed")
|
||||
|
||||
|
|
@ -525,8 +527,8 @@ class LogValidatorTest {
|
|||
}
|
||||
assertEquals(validatedResults.maxTimestampMs, RecordBatch.NO_TIMESTAMP,
|
||||
s"Max timestamp should be ${RecordBatch.NO_TIMESTAMP}")
|
||||
assertEquals(validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs,
|
||||
s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}")
|
||||
assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestampMs,
|
||||
s"Offset of max timestamp should be -1")
|
||||
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
|
||||
|
||||
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
|
||||
|
|
@ -572,8 +574,8 @@ class LogValidatorTest {
|
|||
assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence)
|
||||
}
|
||||
assertEquals(timestamp, validatedResults.maxTimestampMs)
|
||||
assertEquals(validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs,
|
||||
s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}")
|
||||
assertEquals(0, validatedResults.shallowOffsetOfMaxTimestampMs,
|
||||
s"Offset of max timestamp should be 0 when multiple records having the same max timestamp.")
|
||||
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
|
||||
|
||||
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records,
|
||||
|
|
@ -587,6 +589,7 @@ class LogValidatorTest {
|
|||
|
||||
private def checkCompressed(magic: Byte): Unit = {
|
||||
val now = System.currentTimeMillis()
|
||||
// set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
|
||||
val timestampSeq = Seq(now - 1, now + 1, now)
|
||||
|
||||
val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) =
|
||||
|
|
@ -639,8 +642,8 @@ class LogValidatorTest {
|
|||
}
|
||||
}
|
||||
assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}")
|
||||
assertEquals(validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs,
|
||||
s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}")
|
||||
assertEquals(1, validatedResults.shallowOffsetOfMaxTimestampMs,
|
||||
s"Offset of max timestamp should be 1")
|
||||
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
|
||||
|
||||
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records,
|
||||
|
|
|
|||
|
|
@ -329,6 +329,8 @@ public class LogValidator {
|
|||
long maxTimestamp = RecordBatch.NO_TIMESTAMP;
|
||||
LongRef expectedInnerOffset = PrimitiveRef.ofLong(0);
|
||||
List<Record> validatedRecords = new ArrayList<>();
|
||||
long offsetOfMaxTimestamp = -1;
|
||||
long initialOffset = offsetCounter.value;
|
||||
|
||||
int uncompressedSizeInBytes = 0;
|
||||
|
||||
|
|
@ -378,8 +380,11 @@ public class LogValidator {
|
|||
&& batch.magic() > RecordBatch.MAGIC_VALUE_V0
|
||||
&& toMagic > RecordBatch.MAGIC_VALUE_V0) {
|
||||
|
||||
if (record.timestamp() > maxTimestamp)
|
||||
if (record.timestamp() > maxTimestamp) {
|
||||
maxTimestamp = record.timestamp();
|
||||
// The offset is only increased when it is a valid record
|
||||
offsetOfMaxTimestamp = initialOffset + validatedRecords.size();
|
||||
}
|
||||
|
||||
// Some older clients do not implement the V1 internal offsets correctly.
|
||||
// Historically the broker handled this by rewriting the batches rather
|
||||
|
|
@ -416,8 +421,10 @@ public class LogValidator {
|
|||
long lastOffset = offsetCounter.value - 1;
|
||||
firstBatch.setLastOffset(lastOffset);
|
||||
|
||||
if (timestampType == TimestampType.LOG_APPEND_TIME)
|
||||
if (timestampType == TimestampType.LOG_APPEND_TIME) {
|
||||
maxTimestamp = now;
|
||||
offsetOfMaxTimestamp = initialOffset;
|
||||
}
|
||||
|
||||
if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
|
||||
firstBatch.setMaxTimestamp(timestampType, maxTimestamp);
|
||||
|
|
@ -430,7 +437,7 @@ public class LogValidator {
|
|||
now,
|
||||
records,
|
||||
maxTimestamp,
|
||||
lastOffset,
|
||||
offsetOfMaxTimestamp,
|
||||
false,
|
||||
recordConversionStats);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue