MINOR: Small LogValidator clean ups (#14697)

This patch contains a few small clean-ups in LogValidator and associated classes:

1. Set shallowOffsetOfMaxTimestamp consistently as the last offset in the
   batch for v2 compressed and non-compressed data.
2. Rename `RecordConversionStats` to `RecordValidationStats` since one of its
   fields `temporaryMemoryBytes` does not depend on conversion.
3. Rename `batchIndex` in `recordIndex` in loops over the records in each batch
   inside `LogValidator`.

Reviewers: Qichao Chu <5326144+ex172000@users.noreply.github.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Jason Gustafson 2023-11-20 10:40:45 -08:00 committed by GitHub
parent 4eb8ae68ca
commit e905ef1edf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 140 additions and 116 deletions

View File

@ -19,18 +19,18 @@ package org.apache.kafka.common.record;
public class ConvertedRecords<T extends Records> { public class ConvertedRecords<T extends Records> {
private final T records; private final T records;
private final RecordConversionStats recordConversionStats; private final RecordValidationStats recordValidationStats;
public ConvertedRecords(T records, RecordConversionStats recordConversionStats) { public ConvertedRecords(T records, RecordValidationStats recordValidationStats) {
this.records = records; this.records = records;
this.recordConversionStats = recordConversionStats; this.recordValidationStats = recordValidationStats;
} }
public T records() { public T records() {
return records; return records;
} }
public RecordConversionStats recordConversionStats() { public RecordValidationStats recordConversionStats() {
return recordConversionStats; return recordValidationStats;
} }
} }

View File

@ -284,7 +284,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
// are not enough available bytes in the response to read it fully. Note that this is // are not enough available bytes in the response to read it fully. Note that this is
// only possible prior to KIP-74, after which the broker was changed to always return at least // only possible prior to KIP-74, after which the broker was changed to always return at least
// one full record batch, even if it requires exceeding the max fetch size requested by the client. // one full record batch, even if it requires exceeding the max fetch size requested by the client.
return new ConvertedRecords<>(this, RecordConversionStats.EMPTY); return new ConvertedRecords<>(this, RecordValidationStats.EMPTY);
} else { } else {
return convertedRecords; return convertedRecords;
} }

View File

@ -36,14 +36,14 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
private static final int MAX_READ_SIZE = 128 * 1024; private static final int MAX_READ_SIZE = 128 * 1024;
static final int MIN_OVERFLOW_MESSAGE_LENGTH = Records.LOG_OVERHEAD; static final int MIN_OVERFLOW_MESSAGE_LENGTH = Records.LOG_OVERHEAD;
private RecordConversionStats recordConversionStats; private RecordValidationStats recordValidationStats;
private RecordsSend convertedRecordsWriter; private RecordsSend convertedRecordsWriter;
private Iterator<ConvertedRecords<?>> convertedRecordsIterator; private Iterator<ConvertedRecords<?>> convertedRecordsIterator;
public LazyDownConversionRecordsSend(LazyDownConversionRecords records) { public LazyDownConversionRecordsSend(LazyDownConversionRecords records) {
super(records, records.sizeInBytes()); super(records, records.sizeInBytes());
convertedRecordsWriter = null; convertedRecordsWriter = null;
recordConversionStats = new RecordConversionStats(); recordValidationStats = new RecordValidationStats();
convertedRecordsIterator = records().iterator(MAX_READ_SIZE); convertedRecordsIterator = records().iterator(MAX_READ_SIZE);
} }
@ -77,7 +77,7 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
// Get next chunk of down-converted messages // Get next chunk of down-converted messages
ConvertedRecords<?> recordsAndStats = convertedRecordsIterator.next(); ConvertedRecords<?> recordsAndStats = convertedRecordsIterator.next();
convertedRecords = (MemoryRecords) recordsAndStats.records(); convertedRecords = (MemoryRecords) recordsAndStats.records();
recordConversionStats.add(recordsAndStats.recordConversionStats()); recordValidationStats.add(recordsAndStats.recordConversionStats());
log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes()); log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes());
} else { } else {
convertedRecords = buildOverflowBatch(remaining); convertedRecords = buildOverflowBatch(remaining);
@ -97,8 +97,8 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
return (int) convertedRecordsWriter.writeTo(channel); return (int) convertedRecordsWriter.writeTo(channel);
} }
public RecordConversionStats recordConversionStats() { public RecordValidationStats recordConversionStats() {
return recordConversionStats; return recordValidationStats;
} }
public TopicPartition topicPartition() { public TopicPartition topicPartition() {

View File

@ -37,7 +37,7 @@ public class MultiRecordsSend implements Send {
private final Queue<Send> sendQueue; private final Queue<Send> sendQueue;
private final long size; private final long size;
private Map<TopicPartition, RecordConversionStats> recordConversionStats; private Map<TopicPartition, RecordValidationStats> recordConversionStats;
private long totalWritten = 0; private long totalWritten = 0;
private Send current; private Send current;
@ -114,7 +114,7 @@ public class MultiRecordsSend implements Send {
* Get any statistics that were recorded as part of executing this {@link MultiRecordsSend}. * Get any statistics that were recorded as part of executing this {@link MultiRecordsSend}.
* @return Records processing statistics (could be null if no statistics were collected) * @return Records processing statistics (could be null if no statistics were collected)
*/ */
public Map<TopicPartition, RecordConversionStats> recordConversionStats() { public Map<TopicPartition, RecordValidationStats> recordConversionStats() {
return recordConversionStats; return recordConversionStats;
} }

View File

@ -16,25 +16,30 @@
*/ */
package org.apache.kafka.common.record; package org.apache.kafka.common.record;
public class RecordConversionStats { /**
* This class tracks resource usage during broker record validation for eventual reporting in metrics.
* Record validation covers integrity checks on inbound data (e.g. checksum verification), structural
* validation to make sure that records are well-formed, and conversion between record formats if needed.
*/
public class RecordValidationStats {
public static final RecordConversionStats EMPTY = new RecordConversionStats(); public static final RecordValidationStats EMPTY = new RecordValidationStats();
private long temporaryMemoryBytes; private long temporaryMemoryBytes;
private int numRecordsConverted; private int numRecordsConverted;
private long conversionTimeNanos; private long conversionTimeNanos;
public RecordConversionStats(long temporaryMemoryBytes, int numRecordsConverted, long conversionTimeNanos) { public RecordValidationStats(long temporaryMemoryBytes, int numRecordsConverted, long conversionTimeNanos) {
this.temporaryMemoryBytes = temporaryMemoryBytes; this.temporaryMemoryBytes = temporaryMemoryBytes;
this.numRecordsConverted = numRecordsConverted; this.numRecordsConverted = numRecordsConverted;
this.conversionTimeNanos = conversionTimeNanos; this.conversionTimeNanos = conversionTimeNanos;
} }
public RecordConversionStats() { public RecordValidationStats() {
this(0, 0, 0); this(0, 0, 0);
} }
public void add(RecordConversionStats stats) { public void add(RecordValidationStats stats) {
temporaryMemoryBytes += stats.temporaryMemoryBytes; temporaryMemoryBytes += stats.temporaryMemoryBytes;
numRecordsConverted += stats.numRecordsConverted; numRecordsConverted += stats.numRecordsConverted;
conversionTimeNanos += stats.conversionTimeNanos; conversionTimeNanos += stats.conversionTimeNanos;
@ -64,7 +69,7 @@ public class RecordConversionStats {
@Override @Override
public String toString() { public String toString() {
return String.format("RecordConversionStats(temporaryMemoryBytes=%d, numRecordsConverted=%d, conversionTimeNanos=%d)", return String.format("RecordValidationStats(temporaryMemoryBytes=%d, numRecordsConverted=%d, conversionTimeNanos=%d)",
temporaryMemoryBytes, numRecordsConverted, conversionTimeNanos); temporaryMemoryBytes, numRecordsConverted, conversionTimeNanos);
} }
} }

View File

@ -96,7 +96,7 @@ public class RecordsUtil {
} }
buffer.flip(); buffer.flip();
RecordConversionStats stats = new RecordConversionStats(temporaryMemoryBytes, numRecordsConverted, RecordValidationStats stats = new RecordValidationStats(temporaryMemoryBytes, numRecordsConverted,
time.nanoseconds() - startNanos); time.nanoseconds() - startNanos);
return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats); return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats);
} }

View File

@ -801,7 +801,7 @@ public class MemoryRecordsBuilderTest {
} }
} }
private void verifyRecordsProcessingStats(CompressionType compressionType, RecordConversionStats processingStats, private void verifyRecordsProcessingStats(CompressionType compressionType, RecordValidationStats processingStats,
int numRecords, int numRecordsConverted, long finalBytes, int numRecords, int numRecordsConverted, long finalBytes,
long preConvertedBytes) { long preConvertedBytes) {
assertNotNull(processingStats, "Records processing info is null"); assertNotNull(processingStats, "Records processing info is null");

View File

@ -818,7 +818,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs) appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs) appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs)
appendInfo.setLastOffset(offset.value - 1) appendInfo.setLastOffset(offset.value - 1)
appendInfo.setRecordConversionStats(validateAndOffsetAssignResult.recordConversionStats) appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs) appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs)
@ -1188,7 +1188,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
OptionalInt.empty() OptionalInt.empty()
new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, offsetOfMaxTimestamp, new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, offsetOfMaxTimestamp,
RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, sourceCompression, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression,
validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE) validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE)
} }

View File

@ -111,7 +111,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val clientMetricsManager: Option[ClientMetricsManager] val clientMetricsManager: Option[ClientMetricsManager]
) extends ApiRequestHandler with Logging { ) extends ApiRequestHandler with Logging {
type FetchResponseStats = Map[TopicPartition, RecordConversionStats] type FetchResponseStats = Map[TopicPartition, RecordValidationStats]
this.logIdent = "[KafkaApi-%d] ".format(brokerId) this.logIdent = "[KafkaApi-%d] ".format(brokerId)
val configHelper = new ConfigHelper(metadataCache, config, configRepository) val configHelper = new ConfigHelper(metadataCache, config, configRepository)
val authHelper = new AuthHelper(authorizer) val authHelper = new AuthHelper(authorizer)
@ -722,7 +722,7 @@ class KafkaApis(val requestChannel: RequestChannel,
entriesPerPartition = authorizedRequestInfo, entriesPerPartition = authorizedRequestInfo,
requestLocal = requestLocal, requestLocal = requestLocal,
responseCallback = sendResponseCallback, responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback, recordValidationStatsCallback = processingStatsCallback,
transactionalId = produceRequest.transactionalId() transactionalId = produceRequest.transactionalId()
) )
@ -3761,7 +3761,7 @@ class KafkaApis(val requestChannel: RequestChannel,
private def updateRecordConversionStats(request: RequestChannel.Request, private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition, tp: TopicPartition,
conversionStats: RecordConversionStats): Unit = { conversionStats: RecordValidationStats): Unit = {
val conversionCount = conversionStats.numRecordsConverted val conversionCount = conversionStats.numRecordsConverted
if (conversionCount > 0) { if (conversionCount > 0) {
request.header.apiKey match { request.header.apiKey match {

View File

@ -763,7 +763,7 @@ class ReplicaManager(val config: KafkaConfig,
* @param entriesPerPartition the records per partition to be appended * @param entriesPerPartition the records per partition to be appended
* @param responseCallback callback for sending the response * @param responseCallback callback for sending the response
* @param delayedProduceLock lock for the delayed actions * @param delayedProduceLock lock for the delayed actions
* @param recordConversionStatsCallback callback for updating stats on record conversions * @param recordValidationStatsCallback callback for updating stats on record conversions
* @param requestLocal container for the stateful instances scoped to this request * @param requestLocal container for the stateful instances scoped to this request
* @param transactionalId transactional ID if the request is from a producer and the producer is transactional * @param transactionalId transactional ID if the request is from a producer and the producer is transactional
* @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default.
@ -775,7 +775,7 @@ class ReplicaManager(val config: KafkaConfig,
entriesPerPartition: Map[TopicPartition, MemoryRecords], entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit, responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None, delayedProduceLock: Option[Lock] = None,
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (), recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (),
requestLocal: RequestLocal = RequestLocal.NoCaching, requestLocal: RequestLocal = RequestLocal.NoCaching,
transactionalId: String = null, transactionalId: String = null,
actionQueue: ActionQueue = this.defaultActionQueue): Unit = { actionQueue: ActionQueue = this.defaultActionQueue): Unit = {
@ -795,7 +795,7 @@ class ReplicaManager(val config: KafkaConfig,
if (notYetVerifiedEntriesPerPartition.isEmpty || addPartitionsToTxnManager.isEmpty) { if (notYetVerifiedEntriesPerPartition.isEmpty || addPartitionsToTxnManager.isEmpty) {
appendEntries(verifiedEntriesPerPartition, internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap, appendEntries(verifiedEntriesPerPartition, internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap,
errorsPerPartition, recordConversionStatsCallback, timeout, responseCallback, delayedProduceLock, actionQueue)(requestLocal, Map.empty) errorsPerPartition, recordValidationStatsCallback, timeout, responseCallback, delayedProduceLock, actionQueue)(requestLocal, Map.empty)
} else { } else {
// For unverified entries, send a request to verify. When verified, the append process will proceed via the callback. // For unverified entries, send a request to verify. When verified, the append process will proceed via the callback.
// We verify above that all partitions use the same producer ID. // We verify above that all partitions use the same producer ID.
@ -813,7 +813,7 @@ class ReplicaManager(val config: KafkaConfig,
requiredAcks, requiredAcks,
verificationGuards.toMap, verificationGuards.toMap,
errorsPerPartition, errorsPerPartition,
recordConversionStatsCallback, recordValidationStatsCallback,
timeout, timeout,
responseCallback, responseCallback,
delayedProduceLock, delayedProduceLock,
@ -847,7 +847,7 @@ class ReplicaManager(val config: KafkaConfig,
requiredAcks: Short, requiredAcks: Short,
verificationGuards: Map[TopicPartition, VerificationGuard], verificationGuards: Map[TopicPartition, VerificationGuard],
errorsPerPartition: Map[TopicPartition, Errors], errorsPerPartition: Map[TopicPartition, Errors],
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit, recordConversionStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit,
timeout: Long, timeout: Long,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit, responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock], delayedProduceLock: Option[Lock],
@ -920,7 +920,7 @@ class ReplicaManager(val config: KafkaConfig,
} }
} }
recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats }) recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordValidationStats })
if (delayedProduceRequestRequired(requiredAcks, allEntries, allResults)) { if (delayedProduceRequestRequired(requiredAcks, allEntries, allResults)) {
// create delayed produce operation // create delayed produce operation

View File

@ -28,7 +28,7 @@ import kafka.utils._
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordConversionStats} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidationStats}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime} import org.apache.kafka.server.util.{MockScheduler, MockTime}
@ -177,7 +177,7 @@ object AbstractCoordinatorConcurrencyTest {
entriesPerPartition: Map[TopicPartition, MemoryRecords], entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit, responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None, delayedProduceLock: Option[Lock] = None,
processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (), processingStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (),
requestLocal: RequestLocal = RequestLocal.NoCaching, requestLocal: RequestLocal = RequestLocal.NoCaching,
transactionalId: String = null, transactionalId: String = null,
actionQueue: ActionQueue = null): Unit = { actionQueue: ActionQueue = null): Unit = {

View File

@ -177,7 +177,7 @@ class LogValidatorTest {
val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 2 else 0 val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 2 else 0
assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestampMs, assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestampMs,
s"The offset of max timestamp should be $expectedMaxTimestampOffset") s"The offset of max timestamp should be $expectedMaxTimestampOffset")
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records,
compressed = false) compressed = false)
} }
@ -224,8 +224,8 @@ class LogValidatorTest {
assertTrue(validatedResults.messageSizeMaybeChanged, assertTrue(validatedResults.messageSizeMaybeChanged,
"Message size may have been changed") "Message size may have been changed")
val stats = validatedResults.recordConversionStats val stats = validatedResults.recordValidationStats
verifyRecordConversionStats(stats, numConvertedRecords = 3, records, compressed = true) verifyRecordValidationStats(stats, numConvertedRecords = 3, records, compressed = true)
} }
@Test @Test
@ -276,7 +276,7 @@ class LogValidatorTest {
assertFalse(validatedResults.messageSizeMaybeChanged, assertFalse(validatedResults.messageSizeMaybeChanged,
"Message size should not have been changed") "Message size should not have been changed")
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records,
compressed = true) compressed = true)
} }
@ -350,11 +350,14 @@ class LogValidatorTest {
(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH) RecordBatch.NO_PARTITION_LEADER_EPOCH)
val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, TimestampType.CREATE_TIME, producerId, val recordList = List(
producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional,
new SimpleRecord(timestampSeq(0), "hello".getBytes), new SimpleRecord(timestampSeq(0), "hello".getBytes),
new SimpleRecord(timestampSeq(1), "there".getBytes), new SimpleRecord(timestampSeq(1), "there".getBytes),
new SimpleRecord(timestampSeq(2), "beautiful".getBytes)) new SimpleRecord(timestampSeq(2), "beautiful".getBytes)
)
val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, TimestampType.CREATE_TIME, producerId,
producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, recordList: _*)
val offsetCounter = PrimitiveRef.ofLong(0); val offsetCounter = PrimitiveRef.ofLong(0);
val validatingResults = new LogValidator(records, val validatingResults = new LogValidator(records,
@ -399,12 +402,20 @@ class LogValidatorTest {
assertEquals(i, offsetCounter.value); assertEquals(i, offsetCounter.value);
assertEquals(now + 1, validatingResults.maxTimestampMs, assertEquals(now + 1, validatingResults.maxTimestampMs,
s"Max timestamp should be ${now + 1}") s"Max timestamp should be ${now + 1}")
assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs,
val expectedShallowOffsetOfMaxTimestamp = if (magic >= RecordVersion.V2.value) {
// v2 records are always batched, even when not compressed.
// the shallow offset of max timestamp is the last offset of the batch
recordList.size - 1
} else {
1
}
assertEquals(expectedShallowOffsetOfMaxTimestamp, validatingResults.shallowOffsetOfMaxTimestampMs,
s"Offset of max timestamp should be 1") s"Offset of max timestamp should be 1")
assertFalse(validatingResults.messageSizeMaybeChanged, assertFalse(validatingResults.messageSizeMaybeChanged,
"Message size should not have been changed") "Message size should not have been changed")
verifyRecordValidationStats(validatingResults.recordValidationStats, numConvertedRecords = 0, records,
verifyRecordConversionStats(validatingResults.recordConversionStats, numConvertedRecords = 0, records,
compressed = false) compressed = false)
} }
@ -478,7 +489,7 @@ class LogValidatorTest {
assertTrue(validatingResults.messageSizeMaybeChanged, assertTrue(validatingResults.messageSizeMaybeChanged,
"Message size should have been changed") "Message size should have been changed")
verifyRecordConversionStats(validatingResults.recordConversionStats, numConvertedRecords = 3, records, verifyRecordValidationStats(validatingResults.recordValidationStats, numConvertedRecords = 3, records,
compressed = true) compressed = true)
} }
@ -529,7 +540,7 @@ class LogValidatorTest {
s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}") s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}")
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records,
compressed = true) compressed = true)
} }
@ -576,7 +587,7 @@ class LogValidatorTest {
s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}") s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}")
assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed")
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records,
compressed = true) compressed = true)
} }
@ -596,11 +607,14 @@ class LogValidatorTest {
(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH) RecordBatch.NO_PARTITION_LEADER_EPOCH)
val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, TimestampType.CREATE_TIME, producerId, val recordList = List(
producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional,
new SimpleRecord(timestampSeq(0), "hello".getBytes), new SimpleRecord(timestampSeq(0), "hello".getBytes),
new SimpleRecord(timestampSeq(1), "there".getBytes), new SimpleRecord(timestampSeq(1), "there".getBytes),
new SimpleRecord(timestampSeq(2), "beautiful".getBytes)) new SimpleRecord(timestampSeq(2), "beautiful".getBytes)
)
val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, TimestampType.CREATE_TIME, producerId,
producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, recordList: _*)
val validatedResults = new LogValidator(records, val validatedResults = new LogValidator(records,
topicPartition, topicPartition,
@ -639,11 +653,15 @@ class LogValidatorTest {
} }
} }
assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}")
assertEquals(validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs,
// All versions have an outer batch when compressed, so the shallow offset
// of max timestamp is always the offset of the last record in the batch.
val expectedShallowOffsetOfMaxTimestamp = recordList.size - 1
assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestampMs,
s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}") s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}")
assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed")
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records,
compressed = true) compressed = true)
} }
@ -926,7 +944,7 @@ class LogValidatorTest {
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
) )
checkOffsets(validatedResults.validatedRecords, offset) checkOffsets(validatedResults.validatedRecords, offset)
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records,
compressed = false) compressed = false)
} }
@ -952,7 +970,7 @@ class LogValidatorTest {
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
) )
checkOffsets(validatedResults.validatedRecords, offset) checkOffsets(validatedResults.validatedRecords, offset)
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records,
compressed = false) compressed = false)
} }
@ -978,7 +996,7 @@ class LogValidatorTest {
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
) )
checkOffsets(validatedResults.validatedRecords, offset) checkOffsets(validatedResults.validatedRecords, offset)
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records,
compressed = true) compressed = true)
} }
@ -1004,7 +1022,7 @@ class LogValidatorTest {
PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier
) )
checkOffsets(validatedResults.validatedRecords, offset) checkOffsets(validatedResults.validatedRecords, offset)
verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records,
compressed = true) compressed = true)
} }
@ -1641,7 +1659,7 @@ class LogValidatorTest {
} }
} }
def verifyRecordConversionStats(stats: RecordConversionStats, numConvertedRecords: Int, records: MemoryRecords, def verifyRecordValidationStats(stats: RecordValidationStats, numConvertedRecords: Int, records: MemoryRecords,
compressed: Boolean): Unit = { compressed: Boolean): Unit = {
assertNotNull(stats, "Records processing info is null") assertNotNull(stats, "Records processing info is null")
assertEquals(numConvertedRecords, stats.numRecordsConverted) assertEquals(numConvertedRecords, stats.numRecordsConverted)

View File

@ -109,7 +109,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
offsetOfMaxTimestamp, offsetOfMaxTimestamp,
Time.SYSTEM.milliseconds(), Time.SYSTEM.milliseconds(),
state.logStartOffset, state.logStartOffset,
RecordConversionStats.EMPTY, RecordValidationStats.EMPTY,
CompressionType.NONE, CompressionType.NONE,
FetchResponse.recordsSize(partitionData), FetchResponse.recordsSize(partitionData),
batches.headOption.map(_.lastOffset).getOrElse(-1))) batches.headOption.map(_.lastOffset).getOrElse(-1)))

View File

@ -30,13 +30,13 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordConversionStats, SimpleRecord} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordValidationStats, SimpleRecord}
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest}
import org.apache.kafka.common.utils.{LogContext, SystemTime} import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.storage.internals.log.{LogAppendInfo} import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.ParameterizedTest
@ -772,7 +772,7 @@ class ReplicaFetcherThreadTest {
-1L, -1L,
RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP,
-1L, -1L,
RecordConversionStats.EMPTY, RecordValidationStats.EMPTY,
CompressionType.NONE, CompressionType.NONE,
-1, // No records. -1, // No records.
-1L -1L

View File

@ -18,7 +18,7 @@ package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RecordConversionStats; import org.apache.kafka.common.record.RecordValidationStats;
import org.apache.kafka.common.requests.ProduceResponse.RecordError; import org.apache.kafka.common.requests.ProduceResponse.RecordError;
import java.util.Collections; import java.util.Collections;
@ -32,7 +32,7 @@ public class LogAppendInfo {
public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new LogAppendInfo(-1, -1, OptionalInt.empty(), public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L); RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);
private long firstOffset; private long firstOffset;
private long lastOffset; private long lastOffset;
@ -40,7 +40,7 @@ public class LogAppendInfo {
private long offsetOfMaxTimestamp; private long offsetOfMaxTimestamp;
private long logAppendTime; private long logAppendTime;
private long logStartOffset; private long logStartOffset;
private RecordConversionStats recordConversionStats; private RecordValidationStats recordValidationStats;
private final OptionalInt lastLeaderEpoch; private final OptionalInt lastLeaderEpoch;
private final CompressionType sourceCompression; private final CompressionType sourceCompression;
@ -60,7 +60,7 @@ public class LogAppendInfo {
* @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append. * @param logStartOffset The start offset of the log at the time of this append.
* @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false` * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer) * @param sourceCompression The source codec used in the message set (send by the producer)
* @param validBytes The number of valid bytes * @param validBytes The number of valid bytes
* @param lastOffsetOfFirstBatch The last offset of the first batch * @param lastOffsetOfFirstBatch The last offset of the first batch
@ -72,12 +72,12 @@ public class LogAppendInfo {
long offsetOfMaxTimestamp, long offsetOfMaxTimestamp,
long logAppendTime, long logAppendTime,
long logStartOffset, long logStartOffset,
RecordConversionStats recordConversionStats, RecordValidationStats recordValidationStats,
CompressionType sourceCompression, CompressionType sourceCompression,
int validBytes, int validBytes,
long lastOffsetOfFirstBatch) { long lastOffsetOfFirstBatch) {
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset,
recordConversionStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.<RecordError>emptyList(), recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.<RecordError>emptyList(),
LeaderHwChange.NONE); LeaderHwChange.NONE);
} }
@ -92,7 +92,7 @@ public class LogAppendInfo {
* @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append. * @param logStartOffset The start offset of the log at the time of this append.
* @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false` * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer) * @param sourceCompression The source codec used in the message set (send by the producer)
* @param validBytes The number of valid bytes * @param validBytes The number of valid bytes
* @param lastOffsetOfFirstBatch The last offset of the first batch * @param lastOffsetOfFirstBatch The last offset of the first batch
@ -107,7 +107,7 @@ public class LogAppendInfo {
long offsetOfMaxTimestamp, long offsetOfMaxTimestamp,
long logAppendTime, long logAppendTime,
long logStartOffset, long logStartOffset,
RecordConversionStats recordConversionStats, RecordValidationStats recordValidationStats,
CompressionType sourceCompression, CompressionType sourceCompression,
int validBytes, int validBytes,
long lastOffsetOfFirstBatch, long lastOffsetOfFirstBatch,
@ -120,7 +120,7 @@ public class LogAppendInfo {
this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; this.offsetOfMaxTimestamp = offsetOfMaxTimestamp;
this.logAppendTime = logAppendTime; this.logAppendTime = logAppendTime;
this.logStartOffset = logStartOffset; this.logStartOffset = logStartOffset;
this.recordConversionStats = recordConversionStats; this.recordValidationStats = recordValidationStats;
this.sourceCompression = sourceCompression; this.sourceCompression = sourceCompression;
this.validBytes = validBytes; this.validBytes = validBytes;
this.lastOffsetOfFirstBatch = lastOffsetOfFirstBatch; this.lastOffsetOfFirstBatch = lastOffsetOfFirstBatch;
@ -180,12 +180,12 @@ public class LogAppendInfo {
this.logStartOffset = logStartOffset; this.logStartOffset = logStartOffset;
} }
public RecordConversionStats recordConversionStats() { public RecordValidationStats recordValidationStats() {
return recordConversionStats; return recordValidationStats;
} }
public void setRecordConversionStats(RecordConversionStats recordConversionStats) { public void setRecordValidationStats(RecordValidationStats recordValidationStats) {
this.recordConversionStats = recordConversionStats; this.recordValidationStats = recordValidationStats;
} }
public CompressionType sourceCompression() { public CompressionType sourceCompression() {
@ -233,13 +233,13 @@ public class LogAppendInfo {
* @return a new instance with the given LeaderHwChange * @return a new instance with the given LeaderHwChange
*/ */
public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) { public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) {
return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, recordConversionStats, return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats,
sourceCompression, validBytes, lastOffsetOfFirstBatch, recordErrors, newLeaderHwChange); sourceCompression, validBytes, lastOffsetOfFirstBatch, recordErrors, newLeaderHwChange);
} }
public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long logStartOffset) { public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long logStartOffset) {
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L); RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);
} }
/** /**
@ -249,7 +249,7 @@ public class LogAppendInfo {
*/ */
public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long logStartOffset, List<RecordError> recordErrors) { public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long logStartOffset, List<RecordError> recordErrors) {
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L, recordErrors, LeaderHwChange.NONE); RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L, recordErrors, LeaderHwChange.NONE);
} }
@Override @Override
@ -262,7 +262,7 @@ public class LogAppendInfo {
", offsetOfMaxTimestamp=" + offsetOfMaxTimestamp + ", offsetOfMaxTimestamp=" + offsetOfMaxTimestamp +
", logAppendTime=" + logAppendTime + ", logAppendTime=" + logAppendTime +
", logStartOffset=" + logStartOffset + ", logStartOffset=" + logStartOffset +
", recordConversionStats=" + recordConversionStats + ", recordConversionStats=" + recordValidationStats +
", sourceCompression=" + sourceCompression + ", sourceCompression=" + sourceCompression +
", validBytes=" + validBytes + ", validBytes=" + validBytes +
", lastOffsetOfFirstBatch=" + lastOffsetOfFirstBatch + ", lastOffsetOfFirstBatch=" + lastOffsetOfFirstBatch +

View File

@ -38,7 +38,7 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder.RecordsInfo;
import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RecordConversionStats; import org.apache.kafka.common.record.RecordValidationStats;
import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ProduceResponse.RecordError; import org.apache.kafka.common.requests.ProduceResponse.RecordError;
import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.BufferSupplier;
@ -70,17 +70,17 @@ public class LogValidator {
public final long maxTimestampMs; public final long maxTimestampMs;
public final long shallowOffsetOfMaxTimestampMs; public final long shallowOffsetOfMaxTimestampMs;
public final boolean messageSizeMaybeChanged; public final boolean messageSizeMaybeChanged;
public final RecordConversionStats recordConversionStats; public final RecordValidationStats recordValidationStats;
public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs, public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs,
long shallowOffsetOfMaxTimestampMs, boolean messageSizeMaybeChanged, long shallowOffsetOfMaxTimestampMs, boolean messageSizeMaybeChanged,
RecordConversionStats recordConversionStats) { RecordValidationStats recordValidationStats) {
this.logAppendTimeMs = logAppendTimeMs; this.logAppendTimeMs = logAppendTimeMs;
this.validatedRecords = validatedRecords; this.validatedRecords = validatedRecords;
this.maxTimestampMs = maxTimestampMs; this.maxTimestampMs = maxTimestampMs;
this.shallowOffsetOfMaxTimestampMs = shallowOffsetOfMaxTimestampMs; this.shallowOffsetOfMaxTimestampMs = shallowOffsetOfMaxTimestampMs;
this.messageSizeMaybeChanged = messageSizeMaybeChanged; this.messageSizeMaybeChanged = messageSizeMaybeChanged;
this.recordConversionStats = recordConversionStats; this.recordValidationStats = recordValidationStats;
} }
} }
@ -208,16 +208,16 @@ public class LogValidator {
validateBatch(topicPartition, firstBatch, batch, origin, toMagic, metricsRecorder); validateBatch(topicPartition, firstBatch, batch, origin, toMagic, metricsRecorder);
List<ApiRecordError> recordErrors = new ArrayList<>(0); List<ApiRecordError> recordErrors = new ArrayList<>(0);
int batchIndex = 0; int recordIndex = 0;
for (Record record : batch) { for (Record record : batch) {
Optional<ApiRecordError> recordError = validateRecord(batch, topicPartition, Optional<ApiRecordError> recordError = validateRecord(batch, topicPartition,
record, batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, record, recordIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic,
metricsRecorder); metricsRecorder);
recordError.ifPresent(e -> recordErrors.add(e)); recordError.ifPresent(e -> recordErrors.add(e));
// we fail the batch if any record fails, so we stop appending if any record fails // we fail the batch if any record fails, so we stop appending if any record fails
if (recordErrors.isEmpty()) if (recordErrors.isEmpty())
builder.appendWithOffset(offsetCounter.value++, record); builder.appendWithOffset(offsetCounter.value++, record);
++batchIndex; ++recordIndex;
} }
processRecordErrors(recordErrors); processRecordErrors(recordErrors);
@ -226,7 +226,7 @@ public class LogValidator {
MemoryRecords convertedRecords = builder.build(); MemoryRecords convertedRecords = builder.build();
RecordsInfo info = builder.info(); RecordsInfo info = builder.info();
RecordConversionStats recordConversionStats = new RecordConversionStats( RecordValidationStats recordValidationStats = new RecordValidationStats(
builder.uncompressedBytesWritten(), builder.numRecords(), time.nanoseconds() - startNanos); builder.uncompressedBytesWritten(), builder.numRecords(), time.nanoseconds() - startNanos);
return new ValidationResult( return new ValidationResult(
now, now,
@ -234,7 +234,7 @@ public class LogValidator {
info.maxTimestamp, info.maxTimestamp,
info.shallowOffsetOfMaxTimestamp, info.shallowOffsetOfMaxTimestamp,
true, true,
recordConversionStats); recordValidationStats);
} }
// Visible for benchmarking // Visible for benchmarking
@ -257,18 +257,18 @@ public class LogValidator {
// This is a hot path and we want to avoid any unnecessary allocations. // This is a hot path and we want to avoid any unnecessary allocations.
// That said, there is no benefit in using `skipKeyValueIterator` for the uncompressed // That said, there is no benefit in using `skipKeyValueIterator` for the uncompressed
// case since we don't do key/value copies in this path (we just slice the ByteBuffer) // case since we don't do key/value copies in this path (we just slice the ByteBuffer)
int batchIndex = 0; int recordIndex = 0;
for (Record record : batch) { for (Record record : batch) {
Optional<ApiRecordError> recordError = validateRecord(batch, topicPartition, record, Optional<ApiRecordError> recordError = validateRecord(batch, topicPartition, record,
batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); recordIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder);
recordError.ifPresent(e -> recordErrors.add(e)); recordError.ifPresent(recordErrors::add);
long offset = offsetCounter.value++; long offset = offsetCounter.value++;
if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && record.timestamp() > maxBatchTimestamp) { if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && record.timestamp() > maxBatchTimestamp) {
maxBatchTimestamp = record.timestamp(); maxBatchTimestamp = record.timestamp();
offsetOfMaxBatchTimestamp = offset; offsetOfMaxBatchTimestamp = offset;
} }
++batchIndex; ++recordIndex;
} }
processRecordErrors(recordErrors); processRecordErrors(recordErrors);
@ -293,19 +293,20 @@ public class LogValidator {
if (timestampType == TimestampType.LOG_APPEND_TIME) { if (timestampType == TimestampType.LOG_APPEND_TIME) {
maxTimestamp = now; maxTimestamp = now;
if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
offsetOfMaxTimestamp = offsetCounter.value - 1;
else
offsetOfMaxTimestamp = initialOffset; offsetOfMaxTimestamp = initialOffset;
} }
if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
offsetOfMaxTimestamp = offsetCounter.value - 1;
}
return new ValidationResult( return new ValidationResult(
now, now,
records, records,
maxTimestamp, maxTimestamp,
offsetOfMaxTimestamp, offsetOfMaxTimestamp,
false, false,
RecordConversionStats.EMPTY); RecordValidationStats.EMPTY);
} }
/** /**
@ -362,15 +363,15 @@ public class LogValidator {
try { try {
List<ApiRecordError> recordErrors = new ArrayList<>(0); List<ApiRecordError> recordErrors = new ArrayList<>(0);
// this is a hot path and we want to avoid any unnecessary allocations. // this is a hot path and we want to avoid any unnecessary allocations.
int batchIndex = 0; int recordIndex = 0;
while (recordsIterator.hasNext()) { while (recordsIterator.hasNext()) {
Record record = recordsIterator.next(); Record record = recordsIterator.next();
long expectedOffset = expectedInnerOffset.value++; long expectedOffset = expectedInnerOffset.value++;
Optional<ApiRecordError> recordError = validateRecordCompression(sourceCompression, Optional<ApiRecordError> recordError = validateRecordCompression(sourceCompression,
batchIndex, record); recordIndex, record);
if (!recordError.isPresent()) { if (!recordError.isPresent()) {
recordError = validateRecord(batch, topicPartition, record, batchIndex, now, recordError = validateRecord(batch, topicPartition, record, recordIndex, now,
timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder);
} }
@ -396,7 +397,7 @@ public class LogValidator {
validatedRecords.add(record); validatedRecords.add(record);
} }
++batchIndex; ++recordIndex;
} }
processRecordErrors(recordErrors); processRecordErrors(recordErrors);
@ -425,14 +426,14 @@ public class LogValidator {
if (toMagic >= RecordBatch.MAGIC_VALUE_V2) if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
firstBatch.setPartitionLeaderEpoch(partitionLeaderEpoch); firstBatch.setPartitionLeaderEpoch(partitionLeaderEpoch);
RecordConversionStats recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0); RecordValidationStats recordValidationStats = new RecordValidationStats(uncompressedSizeInBytes, 0, 0);
return new ValidationResult( return new ValidationResult(
now, now,
records, records,
maxTimestamp, maxTimestamp,
lastOffset, lastOffset,
false, false,
recordConversionStats); recordValidationStats);
} }
} }
@ -464,7 +465,7 @@ public class LogValidator {
// message format V0 or if the inner offsets are not consecutive. This is OK since the impact is the same: we have // message format V0 or if the inner offsets are not consecutive. This is OK since the impact is the same: we have
// to rebuild the records (including recompression if enabled). // to rebuild the records (including recompression if enabled).
int conversionCount = builder.numRecords(); int conversionCount = builder.numRecords();
RecordConversionStats recordConversionStats = new RecordConversionStats( RecordValidationStats recordValidationStats = new RecordValidationStats(
uncompressedSizeInBytes + builder.uncompressedBytesWritten(), conversionCount, uncompressedSizeInBytes + builder.uncompressedBytesWritten(), conversionCount,
time.nanoseconds() - startNanos); time.nanoseconds() - startNanos);
@ -474,7 +475,7 @@ public class LogValidator {
info.maxTimestamp, info.maxTimestamp,
info.shallowOffsetOfMaxTimestamp, info.shallowOffsetOfMaxTimestamp,
true, true,
recordConversionStats); recordValidationStats);
} }
@ -537,7 +538,7 @@ public class LogValidator {
private static Optional<ApiRecordError> validateRecord(RecordBatch batch, private static Optional<ApiRecordError> validateRecord(RecordBatch batch,
TopicPartition topicPartition, TopicPartition topicPartition,
Record record, Record record,
int batchIndex, int recordIndex,
long now, long now,
TimestampType timestampType, TimestampType timestampType,
long timestampBeforeMaxMs, long timestampBeforeMaxMs,
@ -547,7 +548,7 @@ public class LogValidator {
if (!record.hasMagic(batch.magic())) { if (!record.hasMagic(batch.magic())) {
metricsRecorder.recordInvalidMagic(); metricsRecorder.recordInvalidMagic();
return Optional.of(new ApiRecordError(Errors.INVALID_RECORD, return Optional.of(new ApiRecordError(Errors.INVALID_RECORD,
new RecordError(batchIndex, "Record " + record new RecordError(recordIndex, "Record " + record
+ "'s magic does not match outer magic " + batch.magic() + " in topic partition " + "'s magic does not match outer magic " + batch.magic() + " in topic partition "
+ topicPartition))); + topicPartition)));
} }
@ -565,22 +566,22 @@ public class LogValidator {
} }
} }
Optional<ApiRecordError> keyError = validateKey(record, batchIndex, topicPartition, Optional<ApiRecordError> keyError = validateKey(record, recordIndex, topicPartition,
compactedTopic, metricsRecorder); compactedTopic, metricsRecorder);
if (keyError.isPresent()) if (keyError.isPresent())
return keyError; return keyError;
else else
return validateTimestamp(batch, record, batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs); return validateTimestamp(batch, record, recordIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs);
} }
private static Optional<ApiRecordError> validateKey(Record record, private static Optional<ApiRecordError> validateKey(Record record,
int batchIndex, int recordIndex,
TopicPartition topicPartition, TopicPartition topicPartition,
boolean compactedTopic, boolean compactedTopic,
MetricsRecorder metricsRecorder) { MetricsRecorder metricsRecorder) {
if (compactedTopic && !record.hasKey()) { if (compactedTopic && !record.hasKey()) {
metricsRecorder.recordNoKeyCompactedTopic(); metricsRecorder.recordNoKeyCompactedTopic();
return Optional.of(new ApiRecordError(Errors.INVALID_RECORD, new RecordError(batchIndex, return Optional.of(new ApiRecordError(Errors.INVALID_RECORD, new RecordError(recordIndex,
"Compacted topic cannot accept message without key in topic partition " "Compacted topic cannot accept message without key in topic partition "
+ topicPartition))); + topicPartition)));
} else } else
@ -589,20 +590,20 @@ public class LogValidator {
private static Optional<ApiRecordError> validateTimestamp(RecordBatch batch, private static Optional<ApiRecordError> validateTimestamp(RecordBatch batch,
Record record, Record record,
int batchIndex, int recordIndex,
long now, long now,
TimestampType timestampType, TimestampType timestampType,
long timestampBeforeMaxMs, long timestampBeforeMaxMs,
long timestampAfterMaxMs) { long timestampAfterMaxMs) {
if (timestampType == TimestampType.CREATE_TIME && record.timestamp() != RecordBatch.NO_TIMESTAMP) { if (timestampType == TimestampType.CREATE_TIME && record.timestamp() != RecordBatch.NO_TIMESTAMP) {
if (recordHasInvalidTimestamp(record, now, timestampBeforeMaxMs, timestampAfterMaxMs)) { if (recordHasInvalidTimestamp(record, now, timestampBeforeMaxMs, timestampAfterMaxMs)) {
return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex, return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(recordIndex,
"Timestamp " + record.timestamp() + " of message with offset " + record.offset() "Timestamp " + record.timestamp() + " of message with offset " + record.offset()
+ " is out of range. The timestamp should be within [" + (now - timestampBeforeMaxMs) + " is out of range. The timestamp should be within [" + (now - timestampBeforeMaxMs)
+ ", " + (now + timestampAfterMaxMs) + "]"))); + ", " + (now + timestampAfterMaxMs) + "]")));
} }
} else if (batch.timestampType() == TimestampType.LOG_APPEND_TIME) } else if (batch.timestampType() == TimestampType.LOG_APPEND_TIME)
return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex, return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(recordIndex,
"Invalid timestamp type in message " + record + ". Producer should not set timestamp " "Invalid timestamp type in message " + record + ". Producer should not set timestamp "
+ "type to LogAppendTime."))); + "type to LogAppendTime.")));
return Optional.empty(); return Optional.empty();
@ -618,10 +619,10 @@ public class LogValidator {
} }
private static Optional<ApiRecordError> validateRecordCompression(CompressionType sourceCompression, private static Optional<ApiRecordError> validateRecordCompression(CompressionType sourceCompression,
int batchIndex, int recordIndex,
Record record) { Record record) {
if (sourceCompression != CompressionType.NONE && record.isCompressed()) if (sourceCompression != CompressionType.NONE && record.isCompressed())
return Optional.of(new ApiRecordError(Errors.INVALID_RECORD, new RecordError(batchIndex, return Optional.of(new ApiRecordError(Errors.INVALID_RECORD, new RecordError(recordIndex,
"Compressed outer record should not have an inner record with a compression attribute set: " "Compressed outer record should not have an inner record with a compression attribute set: "
+ record))); + record)));
else else