KAFKA-17104 InvalidMessageCrcRecordsPerSec is not updated in validating LegacyRecord (#16558)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TaiJuWu 2024-07-24 19:25:11 +08:00 committed by GitHub
parent 539f466ccb
commit 4fa1c21940
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 54 additions and 2 deletions

View File

@ -124,7 +124,7 @@ public final class LegacyRecord {
} }
/** /**
* Throw an InvalidRecordException if isValid is false for this record * Throw an CorruptRecordException if isValid is false for this record
*/ */
public void ensureValid() { public void ensureValid() {
if (sizeInBytes() < RECORD_OVERHEAD_V0) if (sizeInBytes() < RECORD_OVERHEAD_V0)

View File

@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
import kafka.server.{BrokerTopicStats, RequestLocal} import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.TestUtils.meterCount import kafka.utils.TestUtils.meterCount
import org.apache.kafka.common.compress.{Compression, GzipCompression, Lz4Compression} import org.apache.kafka.common.compress.{Compression, GzipCompression, Lz4Compression}
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{PrimitiveRef, Time} import org.apache.kafka.common.utils.{PrimitiveRef, Time}
import org.apache.kafka.common.{InvalidRecordException, TopicPartition} import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
@ -33,6 +33,8 @@ import org.apache.kafka.storage.internals.log.{AppendOrigin, LogValidator, Recor
import org.apache.kafka.test.TestUtils import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -687,6 +689,53 @@ class LogValidatorTest {
verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records, verifyRecordValidationStats(validatedResults.recordValidationStats, numConvertedRecords = 0, records,
compressed = true) compressed = true)
} }
@ParameterizedTest
@CsvSource(Array("0,gzip", "1,gzip", "0,lz4", "1,lz4", "0,snappy", "1,snappy"))
def testInvalidChecksum(code: Byte, compression: String): Unit = {
checkInvalidChecksum(code, Compression.of(compression).build(), CompressionType.forName(compression))
}
private def checkInvalidChecksum(magic: Byte, compression: Compression , compressionType: CompressionType): Unit = {
val record: LegacyRecord = LegacyRecord.create(magic, 0L, null, "hello".getBytes)
val buf: ByteBuffer = record.buffer
// enforce modify crc to make checksum error
buf.put(LegacyRecord.CRC_OFFSET, 0.toByte)
val buffer: ByteBuffer = ByteBuffer.allocate(1024)
val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, magic, compression,
TimestampType.CREATE_TIME, 0L)
builder.appendUncheckedWithOffset(0, record)
val memoryRecords: MemoryRecords = builder.build
val logValidator: LogValidator = new LogValidator(
memoryRecords,
topicPartition,
time,
compressionType,
compression,
false,
magic,
TimestampType.CREATE_TIME,
1000L,
1000L,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
AppendOrigin.CLIENT,
MetadataVersion.latestTesting
)
assertThrows(classOf[CorruptRecordException], () =>
logValidator.validateMessagesAndAssignOffsets(
PrimitiveRef.ofLong(0),
metricsRecorder,
RequestLocal.withThreadConfinedCaching.bufferSupplier
)
)
assertEquals(metricsKeySet.count(_.getMBeanName.endsWith(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}")), 1)
assertTrue(meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}") > 0)
}
@Test @Test
def testCompressedV2(): Unit = { def testCompressedV2(): Unit = {

View File

@ -574,6 +574,9 @@ public class LogValidator {
if (batch.magic() <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed()) { if (batch.magic() <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed()) {
try { try {
record.ensureValid(); record.ensureValid();
} catch (CorruptRecordException e) {
metricsRecorder.recordInvalidChecksums();
throw e;
} catch (InvalidRecordException e) { } catch (InvalidRecordException e) {
metricsRecorder.recordInvalidChecksums(); metricsRecorder.recordInvalidChecksums();
throw new CorruptRecordException(e.getMessage() + " in topic partition " + topicPartition); throw new CorruptRecordException(e.getMessage() + " in topic partition " + topicPartition);