HOTFIX: Allow multi-batches for old format and no compression (#6871)

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Guozhang Wang 2019-06-03 16:56:28 -07:00 committed by GitHub
parent 55d07e717e
commit 573152dfa8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 56 deletions

View File

@ -43,6 +43,15 @@ public abstract class AbstractRecords implements Records {
return true;
}
public boolean firstBatchHasCompatibleMagic(byte magic) {
Iterator<? extends RecordBatch> iterator = batches().iterator();
if (!iterator.hasNext())
return true;
return iterator.next().magic() <= magic;
}
/**
* Get an iterator over the deep records.
* @return An iterator over the records

View File

@ -74,21 +74,18 @@ private[kafka] object LogValidator extends Logging {
}
}
private[kafka] def validateOneBatchRecords(records: MemoryRecords): RecordBatch = {
// Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
private[kafka] def validateOneBatchRecords(records: MemoryRecords) {
val batchIterator = records.batches.iterator
if (!batchIterator.hasNext) {
throw new InvalidRecordException("Compressed outer record has no batches at all")
}
val batch = batchIterator.next()
batchIterator.next()
if (batchIterator.hasNext) {
throw new InvalidRecordException("Compressed outer record has more than one batch")
}
batch
}
private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = {
@ -197,11 +194,12 @@ private[kafka] object LogValidator extends Logging {
var offsetOfMaxTimestamp = -1L
val initialOffset = offsetCounter.value
for (batch <- records.batches.asScala) {
if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
validateOneBatchRecords(records)
}
if (!records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) {
// for v2 and beyond, we should check there's only one batch.
validateOneBatchRecords(records)
}
for (batch <- records.batches.asScala) {
validateBatch(batch, isFromClient, magic)
var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
@ -280,38 +278,47 @@ private[kafka] object LogValidator extends Logging {
var uncompressedSizeInBytes = 0
val batch = validateOneBatchRecords(records)
// Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
// One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually
// a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records
if (sourceCodec != NoCompressionCodec || !records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) {
validateOneBatchRecords(records)
}
validateBatch(batch, isFromClient, toMagic)
uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
val batches = records.batches.asScala
// Do not compress control records unless they are written compressed
if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
inPlaceAssignment = true
for (batch <- batches) {
validateBatch(batch, isFromClient, toMagic)
uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
for (record <- batch.asScala) {
if (sourceCodec != NoCompressionCodec && record.isCompressed)
throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
s"compression attribute set: $record")
if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
// Do not compress control records unless they are written compressed
if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
inPlaceAssignment = true
uncompressedSizeInBytes += record.sizeInBytes()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
// Check if we need to overwrite offset
// No in place assignment situation 3
if (record.offset != expectedInnerOffset.getAndIncrement())
for (record <- batch.asScala) {
if (sourceCodec != NoCompressionCodec && record.isCompressed)
throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
s"compression attribute set: $record")
if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
uncompressedSizeInBytes += record.sizeInBytes()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
// Check if we need to overwrite offset
// No in place assignment situation 3
if (record.offset != expectedInnerOffset.getAndIncrement())
inPlaceAssignment = false
if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp
}
// No in place assignment situation 4
if (!record.hasMagic(toMagic))
inPlaceAssignment = false
if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp
validatedRecords += record
}
// No in place assignment situation 4
if (!record.hasMagic(toMagic))
inPlaceAssignment = false
validatedRecords += record
}
if (!inPlaceAssignment) {

View File

@ -37,40 +37,42 @@ class LogValidatorTest {
val time = Time.SYSTEM
@Test
def testOnlyOneBatchCompressedV0(): Unit = {
checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP)
def testOnlyOneBatch(): Unit = {
checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP, CompressionType.GZIP)
checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP, CompressionType.GZIP)
checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.NONE)
checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP, CompressionType.NONE)
checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP, CompressionType.NONE)
checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, CompressionType.NONE)
checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, CompressionType.GZIP)
}
@Test
def testOnlyOneBatchCompressedV1(): Unit = {
checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP)
def testAllowMultiBatch(): Unit = {
checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE, CompressionType.NONE)
checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, CompressionType.NONE)
checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE, CompressionType.GZIP)
checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, CompressionType.GZIP)
}
@Test
def testOnlyOneBatchCompressedV2(): Unit = {
checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.GZIP)
}
@Test
def testOnlyOneBatchUncompressedV2(): Unit = {
checkOnlyOneBatchCompressed(RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE)
}
private def checkOnlyOneBatchCompressed(magic: Byte, compressionType: CompressionType) {
validateMessages(createRecords(magic, 0L, compressionType), magic, compressionType)
private def checkOnlyOneBatch(magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType) {
assertThrows[InvalidRecordException] {
validateMessages(createTwoBatchedRecords(magic, 0L, compressionType), magic, compressionType)
validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType)
}
}
private def validateMessages(records: MemoryRecords, magic: Byte, compressionType: CompressionType): Unit = {
private def checkAllowMultiBatch(magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType) {
validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType)
}
private def validateMessages(records: MemoryRecords, magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType): Unit = {
LogValidator.validateMessagesAndAssignOffsets(records,
new LongRef(0L),
time,
now = 0L,
CompressionCodec.getCompressionCodec(compressionType.name),
CompressionCodec.getCompressionCodec(compressionType.name),
CompressionCodec.getCompressionCodec(sourceCompressionType.name),
CompressionCodec.getCompressionCodec(targetCompressionType.name),
compactedTopic = false,
magic,
TimestampType.CREATE_TIME,