diff --git a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java b/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java index d9150e5044d..79ce2c83f28 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/ConvertedRecords.java @@ -19,18 +19,18 @@ package org.apache.kafka.common.record; public class ConvertedRecords { private final T records; - private final RecordConversionStats recordConversionStats; + private final RecordValidationStats recordValidationStats; - public ConvertedRecords(T records, RecordConversionStats recordConversionStats) { + public ConvertedRecords(T records, RecordValidationStats recordValidationStats) { this.records = records; - this.recordConversionStats = recordConversionStats; + this.recordValidationStats = recordValidationStats; } public T records() { return records; } - public RecordConversionStats recordConversionStats() { - return recordConversionStats; + public RecordValidationStats recordConversionStats() { + return recordValidationStats; } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 6ff9b390965..23a88c277c9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -284,7 +284,7 @@ public class FileRecords extends AbstractRecords implements Closeable { // are not enough available bytes in the response to read it fully. Note that this is // only possible prior to KIP-74, after which the broker was changed to always return at least // one full record batch, even if it requires exceeding the max fetch size requested by the client. - return new ConvertedRecords<>(this, RecordConversionStats.EMPTY); + return new ConvertedRecords<>(this, RecordValidationStats.EMPTY); } else { return convertedRecords; } diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java index f5f8dcecb67..001158b5a9a 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java @@ -36,14 +36,14 @@ public final class LazyDownConversionRecordsSend extends RecordsSend> convertedRecordsIterator; public LazyDownConversionRecordsSend(LazyDownConversionRecords records) { super(records, records.sizeInBytes()); convertedRecordsWriter = null; - recordConversionStats = new RecordConversionStats(); + recordValidationStats = new RecordValidationStats(); convertedRecordsIterator = records().iterator(MAX_READ_SIZE); } @@ -77,7 +77,7 @@ public final class LazyDownConversionRecordsSend extends RecordsSend recordsAndStats = convertedRecordsIterator.next(); convertedRecords = (MemoryRecords) recordsAndStats.records(); - recordConversionStats.add(recordsAndStats.recordConversionStats()); + recordValidationStats.add(recordsAndStats.recordConversionStats()); log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes()); } else { convertedRecords = buildOverflowBatch(remaining); @@ -97,8 +97,8 @@ public final class LazyDownConversionRecordsSend extends RecordsSend sendQueue; private final long size; - private Map recordConversionStats; + private Map recordConversionStats; private long totalWritten = 0; private Send current; @@ -114,7 +114,7 @@ public class MultiRecordsSend implements Send { * Get any statistics that were recorded as part of executing this {@link MultiRecordsSend}. * @return Records processing statistics (could be null if no statistics were collected) */ - public Map recordConversionStats() { + public Map recordConversionStats() { return recordConversionStats; } diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordConversionStats.java b/clients/src/main/java/org/apache/kafka/common/record/RecordValidationStats.java similarity index 78% rename from clients/src/main/java/org/apache/kafka/common/record/RecordConversionStats.java rename to clients/src/main/java/org/apache/kafka/common/record/RecordValidationStats.java index 4f0bca527fb..4dfcf66c0ae 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordConversionStats.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordValidationStats.java @@ -16,25 +16,30 @@ */ package org.apache.kafka.common.record; -public class RecordConversionStats { +/** + * This class tracks resource usage during broker record validation for eventual reporting in metrics. + * Record validation covers integrity checks on inbound data (e.g. checksum verification), structural + * validation to make sure that records are well-formed, and conversion between record formats if needed. + */ +public class RecordValidationStats { - public static final RecordConversionStats EMPTY = new RecordConversionStats(); + public static final RecordValidationStats EMPTY = new RecordValidationStats(); private long temporaryMemoryBytes; private int numRecordsConverted; private long conversionTimeNanos; - public RecordConversionStats(long temporaryMemoryBytes, int numRecordsConverted, long conversionTimeNanos) { + public RecordValidationStats(long temporaryMemoryBytes, int numRecordsConverted, long conversionTimeNanos) { this.temporaryMemoryBytes = temporaryMemoryBytes; this.numRecordsConverted = numRecordsConverted; this.conversionTimeNanos = conversionTimeNanos; } - public RecordConversionStats() { + public RecordValidationStats() { this(0, 0, 0); } - public void add(RecordConversionStats stats) { + public void add(RecordValidationStats stats) { temporaryMemoryBytes += stats.temporaryMemoryBytes; numRecordsConverted += stats.numRecordsConverted; conversionTimeNanos += stats.conversionTimeNanos; @@ -64,7 +69,7 @@ public class RecordConversionStats { @Override public String toString() { - return String.format("RecordConversionStats(temporaryMemoryBytes=%d, numRecordsConverted=%d, conversionTimeNanos=%d)", + return String.format("RecordValidationStats(temporaryMemoryBytes=%d, numRecordsConverted=%d, conversionTimeNanos=%d)", temporaryMemoryBytes, numRecordsConverted, conversionTimeNanos); } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java index 423d1e1656f..8328e206146 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java @@ -96,7 +96,7 @@ public class RecordsUtil { } buffer.flip(); - RecordConversionStats stats = new RecordConversionStats(temporaryMemoryBytes, numRecordsConverted, + RecordValidationStats stats = new RecordValidationStats(temporaryMemoryBytes, numRecordsConverted, time.nanoseconds() - startNanos); return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 4f3f03c3f2d..5616fb23f7d 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -801,7 +801,7 @@ public class MemoryRecordsBuilderTest { } } - private void verifyRecordsProcessingStats(CompressionType compressionType, RecordConversionStats processingStats, + private void verifyRecordsProcessingStats(CompressionType compressionType, RecordValidationStats processingStats, int numRecords, int numRecordsConverted, long finalBytes, long preConvertedBytes) { assertNotNull(processingStats, "Records processing info is null"); diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index d1fff6783d8..5cd5014c502 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -818,7 +818,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs) appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs) appendInfo.setLastOffset(offset.value - 1) - appendInfo.setRecordConversionStats(validateAndOffsetAssignResult.recordConversionStats) + appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats) if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs) @@ -1188,7 +1188,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, OptionalInt.empty() new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, offsetOfMaxTimestamp, - RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, sourceCompression, + RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression, validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 3a819a3d184..2b27d4647ac 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -111,7 +111,7 @@ class KafkaApis(val requestChannel: RequestChannel, val clientMetricsManager: Option[ClientMetricsManager] ) extends ApiRequestHandler with Logging { - type FetchResponseStats = Map[TopicPartition, RecordConversionStats] + type FetchResponseStats = Map[TopicPartition, RecordValidationStats] this.logIdent = "[KafkaApi-%d] ".format(brokerId) val configHelper = new ConfigHelper(metadataCache, config, configRepository) val authHelper = new AuthHelper(authorizer) @@ -722,7 +722,7 @@ class KafkaApis(val requestChannel: RequestChannel, entriesPerPartition = authorizedRequestInfo, requestLocal = requestLocal, responseCallback = sendResponseCallback, - recordConversionStatsCallback = processingStatsCallback, + recordValidationStatsCallback = processingStatsCallback, transactionalId = produceRequest.transactionalId() ) @@ -3761,7 +3761,7 @@ class KafkaApis(val requestChannel: RequestChannel, private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, - conversionStats: RecordConversionStats): Unit = { + conversionStats: RecordValidationStats): Unit = { val conversionCount = conversionStats.numRecordsConverted if (conversionCount > 0) { request.header.apiKey match { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index dfd50fdec6c..681073311a7 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -763,7 +763,7 @@ class ReplicaManager(val config: KafkaConfig, * @param entriesPerPartition the records per partition to be appended * @param responseCallback callback for sending the response * @param delayedProduceLock lock for the delayed actions - * @param recordConversionStatsCallback callback for updating stats on record conversions + * @param recordValidationStatsCallback callback for updating stats on record conversions * @param requestLocal container for the stateful instances scoped to this request * @param transactionalId transactional ID if the request is from a producer and the producer is transactional * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. @@ -775,7 +775,7 @@ class ReplicaManager(val config: KafkaConfig, entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Lock] = None, - recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (), + recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.NoCaching, transactionalId: String = null, actionQueue: ActionQueue = this.defaultActionQueue): Unit = { @@ -795,7 +795,7 @@ class ReplicaManager(val config: KafkaConfig, if (notYetVerifiedEntriesPerPartition.isEmpty || addPartitionsToTxnManager.isEmpty) { appendEntries(verifiedEntriesPerPartition, internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap, - errorsPerPartition, recordConversionStatsCallback, timeout, responseCallback, delayedProduceLock, actionQueue)(requestLocal, Map.empty) + errorsPerPartition, recordValidationStatsCallback, timeout, responseCallback, delayedProduceLock, actionQueue)(requestLocal, Map.empty) } else { // For unverified entries, send a request to verify. When verified, the append process will proceed via the callback. // We verify above that all partitions use the same producer ID. @@ -813,7 +813,7 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks, verificationGuards.toMap, errorsPerPartition, - recordConversionStatsCallback, + recordValidationStatsCallback, timeout, responseCallback, delayedProduceLock, @@ -847,7 +847,7 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks: Short, verificationGuards: Map[TopicPartition, VerificationGuard], errorsPerPartition: Map[TopicPartition, Errors], - recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit, + recordConversionStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit, timeout: Long, responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Lock], @@ -920,7 +920,7 @@ class ReplicaManager(val config: KafkaConfig, } } - recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats }) + recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordValidationStats }) if (delayedProduceRequestRequired(requiredAcks, allEntries, allResults)) { // create delayed produce operation diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 9b8c02249aa..f4ef0d4a9fd 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -28,7 +28,7 @@ import kafka.utils._ import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordConversionStats} +import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidationStats} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.server.util.timer.MockTimer import org.apache.kafka.server.util.{MockScheduler, MockTime} @@ -177,7 +177,7 @@ object AbstractCoordinatorConcurrencyTest { entriesPerPartition: Map[TopicPartition, MemoryRecords], responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Lock] = None, - processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (), + processingStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.NoCaching, transactionalId: String = null, actionQueue: ActionQueue = null): Unit = { diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 6b781f6fa69..5d900223a71 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -177,7 +177,7 @@ class LogValidatorTest { val expectedMaxTimestampOffset = if (magic >= RecordBatch.MAGIC_VALUE_V2) 2 else 0 assertEquals(expectedMaxTimestampOffset, validatedResults.shallowOffsetOfMaxTimestampMs, s"The offset of max timestamp should be $expectedMaxTimestampOffset") - verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records, + verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = false) } @@ -224,8 +224,8 @@ class LogValidatorTest { assertTrue(validatedResults.messageSizeMaybeChanged, "Message size may have been changed") - val stats = validatedResults.recordConversionStats - verifyRecordConversionStats(stats, numConvertedRecords = 3, records, compressed = true) + val stats = validatedResults.recordValidationStats + verifyRecordValidationStats(stats, numConvertedRecords = 3, records, compressed = true) } @Test @@ -276,7 +276,7 @@ class LogValidatorTest { assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") - verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records, + verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = true) } @@ -350,11 +350,14 @@ class LogValidatorTest { (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH) - val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, TimestampType.CREATE_TIME, producerId, - producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, + val recordList = List( new SimpleRecord(timestampSeq(0), "hello".getBytes), new SimpleRecord(timestampSeq(1), "there".getBytes), - new SimpleRecord(timestampSeq(2), "beautiful".getBytes)) + new SimpleRecord(timestampSeq(2), "beautiful".getBytes) + ) + + val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, TimestampType.CREATE_TIME, producerId, + producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, recordList: _*) val offsetCounter = PrimitiveRef.ofLong(0); val validatingResults = new LogValidator(records, @@ -399,12 +402,20 @@ class LogValidatorTest { assertEquals(i, offsetCounter.value); assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs, + + val expectedShallowOffsetOfMaxTimestamp = if (magic >= RecordVersion.V2.value) { + // v2 records are always batched, even when not compressed. + // the shallow offset of max timestamp is the last offset of the batch + recordList.size - 1 + } else { + 1 + } + assertEquals(expectedShallowOffsetOfMaxTimestamp, validatingResults.shallowOffsetOfMaxTimestampMs, s"Offset of max timestamp should be 1") + assertFalse(validatingResults.messageSizeMaybeChanged, "Message size should not have been changed") - - verifyRecordConversionStats(validatingResults.recordConversionStats, numConvertedRecords = 0, records, + verifyRecordValidationStats(validatingResults.recordValidationStats, numConvertedRecords = 0, records, compressed = false) } @@ -478,7 +489,7 @@ class LogValidatorTest { assertTrue(validatingResults.messageSizeMaybeChanged, "Message size should have been changed") - verifyRecordConversionStats(validatingResults.recordConversionStats, numConvertedRecords = 3, records, + verifyRecordValidationStats(validatingResults.recordValidationStats, numConvertedRecords = 3, records, compressed = true) } @@ -529,7 +540,7 @@ class LogValidatorTest { s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") - verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, + verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, compressed = true) } @@ -576,7 +587,7 @@ class LogValidatorTest { s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}") assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed") - verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, + verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, compressed = true) } @@ -596,11 +607,14 @@ class LogValidatorTest { (RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH) - val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, TimestampType.CREATE_TIME, producerId, - producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, + val recordList = List( new SimpleRecord(timestampSeq(0), "hello".getBytes), new SimpleRecord(timestampSeq(1), "there".getBytes), - new SimpleRecord(timestampSeq(2), "beautiful".getBytes)) + new SimpleRecord(timestampSeq(2), "beautiful".getBytes) + ) + + val records = MemoryRecords.withRecords(magic, 0L, CompressionType.GZIP, TimestampType.CREATE_TIME, producerId, + producerEpoch, baseSequence, partitionLeaderEpoch, isTransactional, recordList: _*) val validatedResults = new LogValidator(records, topicPartition, @@ -639,11 +653,15 @@ class LogValidatorTest { } } assertEquals(now + 1, validatedResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") - assertEquals(validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestampMs, + + // All versions have an outer batch when compressed, so the shallow offset + // of max timestamp is always the offset of the last record in the batch. + val expectedShallowOffsetOfMaxTimestamp = recordList.size - 1 + assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestampMs, s"Offset of max timestamp should be ${validatedRecords.records.asScala.size - 1}") assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed") - verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 0, records, + verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, compressed = true) } @@ -926,7 +944,7 @@ class LogValidatorTest { PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) checkOffsets(validatedResults.validatedRecords, offset) - verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, + verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, compressed = false) } @@ -952,7 +970,7 @@ class LogValidatorTest { PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) checkOffsets(validatedResults.validatedRecords, offset) - verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, + verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, compressed = false) } @@ -978,7 +996,7 @@ class LogValidatorTest { PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) checkOffsets(validatedResults.validatedRecords, offset) - verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, + verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, compressed = true) } @@ -1004,7 +1022,7 @@ class LogValidatorTest { PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching.bufferSupplier ) checkOffsets(validatedResults.validatedRecords, offset) - verifyRecordConversionStats(validatedResults.recordConversionStats, numConvertedRecords = 3, records, + verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 3, records, compressed = true) } @@ -1641,7 +1659,7 @@ class LogValidatorTest { } } - def verifyRecordConversionStats(stats: RecordConversionStats, numConvertedRecords: Int, records: MemoryRecords, + def verifyRecordValidationStats(stats: RecordValidationStats, numConvertedRecords: Int, records: MemoryRecords, compressed: Boolean): Unit = { assertNotNull(stats, "Records processing info is null") assertEquals(numConvertedRecords, stats.numRecordsConverted) diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala index 94908caef5d..49efa0a49ba 100644 --- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala +++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala @@ -109,7 +109,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint, offsetOfMaxTimestamp, Time.SYSTEM.milliseconds(), state.logStartOffset, - RecordConversionStats.EMPTY, + RecordValidationStats.EMPTY, CompressionType.NONE, FetchResponse.recordsSize(partitionData), batches.headOption.map(_.lastOffset).getOrElse(-1))) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 7258999cd29..053583fd889 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -30,13 +30,13 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordConversionStats, SimpleRecord} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordValidationStats, SimpleRecord} import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, UpdateMetadataRequest} import org.apache.kafka.common.utils.{LogContext, SystemTime} import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch} import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 -import org.apache.kafka.storage.internals.log.{LogAppendInfo} +import org.apache.kafka.storage.internals.log.LogAppendInfo import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.params.ParameterizedTest @@ -772,7 +772,7 @@ class ReplicaFetcherThreadTest { -1L, RecordBatch.NO_TIMESTAMP, -1L, - RecordConversionStats.EMPTY, + RecordValidationStats.EMPTY, CompressionType.NONE, -1, // No records. -1L diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java index 0ecaaabe223..76b9bef7d2c 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java @@ -18,7 +18,7 @@ package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.RecordConversionStats; +import org.apache.kafka.common.record.RecordValidationStats; import org.apache.kafka.common.requests.ProduceResponse.RecordError; import java.util.Collections; @@ -32,7 +32,7 @@ public class LogAppendInfo { public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, - RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L); + RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L); private long firstOffset; private long lastOffset; @@ -40,7 +40,7 @@ public class LogAppendInfo { private long offsetOfMaxTimestamp; private long logAppendTime; private long logStartOffset; - private RecordConversionStats recordConversionStats; + private RecordValidationStats recordValidationStats; private final OptionalInt lastLeaderEpoch; private final CompressionType sourceCompression; @@ -60,7 +60,7 @@ public class LogAppendInfo { * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp * @param logStartOffset The start offset of the log at the time of this append. - * @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false` + * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` * @param sourceCompression The source codec used in the message set (send by the producer) * @param validBytes The number of valid bytes * @param lastOffsetOfFirstBatch The last offset of the first batch @@ -72,12 +72,12 @@ public class LogAppendInfo { long offsetOfMaxTimestamp, long logAppendTime, long logStartOffset, - RecordConversionStats recordConversionStats, + RecordValidationStats recordValidationStats, CompressionType sourceCompression, int validBytes, long lastOffsetOfFirstBatch) { this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, - recordConversionStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(), + recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(), LeaderHwChange.NONE); } @@ -92,7 +92,7 @@ public class LogAppendInfo { * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp * @param logStartOffset The start offset of the log at the time of this append. - * @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false` + * @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false` * @param sourceCompression The source codec used in the message set (send by the producer) * @param validBytes The number of valid bytes * @param lastOffsetOfFirstBatch The last offset of the first batch @@ -107,7 +107,7 @@ public class LogAppendInfo { long offsetOfMaxTimestamp, long logAppendTime, long logStartOffset, - RecordConversionStats recordConversionStats, + RecordValidationStats recordValidationStats, CompressionType sourceCompression, int validBytes, long lastOffsetOfFirstBatch, @@ -120,7 +120,7 @@ public class LogAppendInfo { this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; - this.recordConversionStats = recordConversionStats; + this.recordValidationStats = recordValidationStats; this.sourceCompression = sourceCompression; this.validBytes = validBytes; this.lastOffsetOfFirstBatch = lastOffsetOfFirstBatch; @@ -180,12 +180,12 @@ public class LogAppendInfo { this.logStartOffset = logStartOffset; } - public RecordConversionStats recordConversionStats() { - return recordConversionStats; + public RecordValidationStats recordValidationStats() { + return recordValidationStats; } - public void setRecordConversionStats(RecordConversionStats recordConversionStats) { - this.recordConversionStats = recordConversionStats; + public void setRecordValidationStats(RecordValidationStats recordValidationStats) { + this.recordValidationStats = recordValidationStats; } public CompressionType sourceCompression() { @@ -233,13 +233,13 @@ public class LogAppendInfo { * @return a new instance with the given LeaderHwChange */ public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) { - return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, recordConversionStats, + return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, recordErrors, newLeaderHwChange); } public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long logStartOffset) { return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, - RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L); + RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L); } /** @@ -249,7 +249,7 @@ public class LogAppendInfo { */ public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long logStartOffset, List recordErrors) { return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, - RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L, recordErrors, LeaderHwChange.NONE); + RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L, recordErrors, LeaderHwChange.NONE); } @Override @@ -262,7 +262,7 @@ public class LogAppendInfo { ", offsetOfMaxTimestamp=" + offsetOfMaxTimestamp + ", logAppendTime=" + logAppendTime + ", logStartOffset=" + logStartOffset + - ", recordConversionStats=" + recordConversionStats + + ", recordConversionStats=" + recordValidationStats + ", sourceCompression=" + sourceCompression + ", validBytes=" + validBytes + ", lastOffsetOfFirstBatch=" + lastOffsetOfFirstBatch + diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java index a05512a0196..378247fa270 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java @@ -38,7 +38,7 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder.RecordsInfo; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.RecordConversionStats; +import org.apache.kafka.common.record.RecordValidationStats; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.ProduceResponse.RecordError; import org.apache.kafka.common.utils.BufferSupplier; @@ -70,17 +70,17 @@ public class LogValidator { public final long maxTimestampMs; public final long shallowOffsetOfMaxTimestampMs; public final boolean messageSizeMaybeChanged; - public final RecordConversionStats recordConversionStats; + public final RecordValidationStats recordValidationStats; public ValidationResult(long logAppendTimeMs, MemoryRecords validatedRecords, long maxTimestampMs, - long shallowOffsetOfMaxTimestampMs, boolean messageSizeMaybeChanged, - RecordConversionStats recordConversionStats) { + long shallowOffsetOfMaxTimestampMs, boolean messageSizeMaybeChanged, + RecordValidationStats recordValidationStats) { this.logAppendTimeMs = logAppendTimeMs; this.validatedRecords = validatedRecords; this.maxTimestampMs = maxTimestampMs; this.shallowOffsetOfMaxTimestampMs = shallowOffsetOfMaxTimestampMs; this.messageSizeMaybeChanged = messageSizeMaybeChanged; - this.recordConversionStats = recordConversionStats; + this.recordValidationStats = recordValidationStats; } } @@ -208,16 +208,16 @@ public class LogValidator { validateBatch(topicPartition, firstBatch, batch, origin, toMagic, metricsRecorder); List recordErrors = new ArrayList<>(0); - int batchIndex = 0; + int recordIndex = 0; for (Record record : batch) { Optional recordError = validateRecord(batch, topicPartition, - record, batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, + record, recordIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); recordError.ifPresent(e -> recordErrors.add(e)); // we fail the batch if any record fails, so we stop appending if any record fails if (recordErrors.isEmpty()) builder.appendWithOffset(offsetCounter.value++, record); - ++batchIndex; + ++recordIndex; } processRecordErrors(recordErrors); @@ -226,7 +226,7 @@ public class LogValidator { MemoryRecords convertedRecords = builder.build(); RecordsInfo info = builder.info(); - RecordConversionStats recordConversionStats = new RecordConversionStats( + RecordValidationStats recordValidationStats = new RecordValidationStats( builder.uncompressedBytesWritten(), builder.numRecords(), time.nanoseconds() - startNanos); return new ValidationResult( now, @@ -234,7 +234,7 @@ public class LogValidator { info.maxTimestamp, info.shallowOffsetOfMaxTimestamp, true, - recordConversionStats); + recordValidationStats); } // Visible for benchmarking @@ -257,18 +257,18 @@ public class LogValidator { // This is a hot path and we want to avoid any unnecessary allocations. // That said, there is no benefit in using `skipKeyValueIterator` for the uncompressed // case since we don't do key/value copies in this path (we just slice the ByteBuffer) - int batchIndex = 0; + int recordIndex = 0; for (Record record : batch) { Optional recordError = validateRecord(batch, topicPartition, record, - batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); - recordError.ifPresent(e -> recordErrors.add(e)); + recordIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); + recordError.ifPresent(recordErrors::add); long offset = offsetCounter.value++; if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && record.timestamp() > maxBatchTimestamp) { maxBatchTimestamp = record.timestamp(); offsetOfMaxBatchTimestamp = offset; } - ++batchIndex; + ++recordIndex; } processRecordErrors(recordErrors); @@ -293,10 +293,11 @@ public class LogValidator { if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; - if (toMagic >= RecordBatch.MAGIC_VALUE_V2) - offsetOfMaxTimestamp = offsetCounter.value - 1; - else - offsetOfMaxTimestamp = initialOffset; + offsetOfMaxTimestamp = initialOffset; + } + + if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { + offsetOfMaxTimestamp = offsetCounter.value - 1; } return new ValidationResult( @@ -305,7 +306,7 @@ public class LogValidator { maxTimestamp, offsetOfMaxTimestamp, false, - RecordConversionStats.EMPTY); + RecordValidationStats.EMPTY); } /** @@ -362,15 +363,15 @@ public class LogValidator { try { List recordErrors = new ArrayList<>(0); // this is a hot path and we want to avoid any unnecessary allocations. - int batchIndex = 0; + int recordIndex = 0; while (recordsIterator.hasNext()) { Record record = recordsIterator.next(); long expectedOffset = expectedInnerOffset.value++; Optional recordError = validateRecordCompression(sourceCompression, - batchIndex, record); + recordIndex, record); if (!recordError.isPresent()) { - recordError = validateRecord(batch, topicPartition, record, batchIndex, now, + recordError = validateRecord(batch, topicPartition, record, recordIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder); } @@ -396,7 +397,7 @@ public class LogValidator { validatedRecords.add(record); } - ++batchIndex; + ++recordIndex; } processRecordErrors(recordErrors); @@ -425,14 +426,14 @@ public class LogValidator { if (toMagic >= RecordBatch.MAGIC_VALUE_V2) firstBatch.setPartitionLeaderEpoch(partitionLeaderEpoch); - RecordConversionStats recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0); + RecordValidationStats recordValidationStats = new RecordValidationStats(uncompressedSizeInBytes, 0, 0); return new ValidationResult( now, records, maxTimestamp, lastOffset, false, - recordConversionStats); + recordValidationStats); } } @@ -464,7 +465,7 @@ public class LogValidator { // message format V0 or if the inner offsets are not consecutive. This is OK since the impact is the same: we have // to rebuild the records (including recompression if enabled). int conversionCount = builder.numRecords(); - RecordConversionStats recordConversionStats = new RecordConversionStats( + RecordValidationStats recordValidationStats = new RecordValidationStats( uncompressedSizeInBytes + builder.uncompressedBytesWritten(), conversionCount, time.nanoseconds() - startNanos); @@ -474,7 +475,7 @@ public class LogValidator { info.maxTimestamp, info.shallowOffsetOfMaxTimestamp, true, - recordConversionStats); + recordValidationStats); } @@ -537,7 +538,7 @@ public class LogValidator { private static Optional validateRecord(RecordBatch batch, TopicPartition topicPartition, Record record, - int batchIndex, + int recordIndex, long now, TimestampType timestampType, long timestampBeforeMaxMs, @@ -547,7 +548,7 @@ public class LogValidator { if (!record.hasMagic(batch.magic())) { metricsRecorder.recordInvalidMagic(); return Optional.of(new ApiRecordError(Errors.INVALID_RECORD, - new RecordError(batchIndex, "Record " + record + new RecordError(recordIndex, "Record " + record + "'s magic does not match outer magic " + batch.magic() + " in topic partition " + topicPartition))); } @@ -565,22 +566,22 @@ public class LogValidator { } } - Optional keyError = validateKey(record, batchIndex, topicPartition, + Optional keyError = validateKey(record, recordIndex, topicPartition, compactedTopic, metricsRecorder); if (keyError.isPresent()) return keyError; else - return validateTimestamp(batch, record, batchIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs); + return validateTimestamp(batch, record, recordIndex, now, timestampType, timestampBeforeMaxMs, timestampAfterMaxMs); } private static Optional validateKey(Record record, - int batchIndex, + int recordIndex, TopicPartition topicPartition, boolean compactedTopic, MetricsRecorder metricsRecorder) { if (compactedTopic && !record.hasKey()) { metricsRecorder.recordNoKeyCompactedTopic(); - return Optional.of(new ApiRecordError(Errors.INVALID_RECORD, new RecordError(batchIndex, + return Optional.of(new ApiRecordError(Errors.INVALID_RECORD, new RecordError(recordIndex, "Compacted topic cannot accept message without key in topic partition " + topicPartition))); } else @@ -589,20 +590,20 @@ public class LogValidator { private static Optional validateTimestamp(RecordBatch batch, Record record, - int batchIndex, + int recordIndex, long now, TimestampType timestampType, long timestampBeforeMaxMs, long timestampAfterMaxMs) { if (timestampType == TimestampType.CREATE_TIME && record.timestamp() != RecordBatch.NO_TIMESTAMP) { if (recordHasInvalidTimestamp(record, now, timestampBeforeMaxMs, timestampAfterMaxMs)) { - return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex, + return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(recordIndex, "Timestamp " + record.timestamp() + " of message with offset " + record.offset() + " is out of range. The timestamp should be within [" + (now - timestampBeforeMaxMs) + ", " + (now + timestampAfterMaxMs) + "]"))); } } else if (batch.timestampType() == TimestampType.LOG_APPEND_TIME) - return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(batchIndex, + return Optional.of(new ApiRecordError(Errors.INVALID_TIMESTAMP, new RecordError(recordIndex, "Invalid timestamp type in message " + record + ". Producer should not set timestamp " + "type to LogAppendTime."))); return Optional.empty(); @@ -618,10 +619,10 @@ public class LogValidator { } private static Optional validateRecordCompression(CompressionType sourceCompression, - int batchIndex, + int recordIndex, Record record) { if (sourceCompression != CompressionType.NONE && record.isCompressed()) - return Optional.of(new ApiRecordError(Errors.INVALID_RECORD, new RecordError(batchIndex, + return Optional.of(new ApiRecordError(Errors.INVALID_RECORD, new RecordError(recordIndex, "Compressed outer record should not have an inner record with a compression attribute set: " + record))); else