diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java index da14b5b494f..d58689de119 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.Time; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; @@ -57,13 +56,15 @@ public class LazyDownConversionRecords implements BaseRecords { // need to make sure that we are able to accommodate one full batch of down-converted messages. The way we achieve // this is by having sizeInBytes method factor in the size of the first down-converted batch and return at least // its size. - AbstractIterator it = records.batchIterator(); + java.util.Iterator it = iterator(0); if (it.hasNext()) { - firstConvertedBatch = RecordsUtil.downConvert(Collections.singletonList(it.peek()), toMagic, firstOffset, time); + firstConvertedBatch = it.next(); sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes()); } else { + // If there are no messages we got after down-conversion, make sure we are able to send at least an overflow + // message to the consumer. Typically, the consumer would need to increase the fetch size in such cases. firstConvertedBatch = null; - sizeInBytes = 0; + sizeInBytes = LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH; } } @@ -148,21 +149,28 @@ public class LazyDownConversionRecords implements BaseRecords { return convertedBatch; } - if (!batchIterator.hasNext()) - return allDone(); + while (batchIterator.hasNext()) { + List batches = new ArrayList<>(); + boolean isFirstBatch = true; + long sizeSoFar = 0; - // Figure out batches we should down-convert based on the size constraints - List batches = new ArrayList<>(); - boolean isFirstBatch = true; - long sizeSoFar = 0; - while (batchIterator.hasNext() && - (isFirstBatch || (batchIterator.peek().sizeInBytes() + sizeSoFar) <= maximumReadSize)) { - RecordBatch currentBatch = batchIterator.next(); - batches.add(currentBatch); - sizeSoFar += currentBatch.sizeInBytes(); - isFirstBatch = false; + // Figure out batches we should down-convert based on the size constraints + while (batchIterator.hasNext() && + (isFirstBatch || (batchIterator.peek().sizeInBytes() + sizeSoFar) <= maximumReadSize)) { + RecordBatch currentBatch = batchIterator.next(); + batches.add(currentBatch); + sizeSoFar += currentBatch.sizeInBytes(); + isFirstBatch = false; + } + ConvertedRecords convertedRecords = RecordsUtil.downConvert(batches, toMagic, firstOffset, time); + // During conversion, it is possible that we drop certain batches because they do not have an equivalent + // representation in the message format we want to convert to. For example, V0 and V1 message formats + // have no notion of transaction markers which were introduced in V2 so they get dropped during conversion. + // We return converted records only when we have at least one valid batch of messages after conversion. + if (convertedRecords.records().sizeInBytes() > 0) + return convertedRecords; } - return RecordsUtil.downConvert(batches, toMagic, firstOffset, time); + return allDone(); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java index e60e1eda9f1..f0fab7d876d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; @@ -33,6 +32,7 @@ import java.util.Iterator; public final class LazyDownConversionRecordsSend extends RecordsSend { private static final Logger log = LoggerFactory.getLogger(LazyDownConversionRecordsSend.class); private static final int MAX_READ_SIZE = 128 * 1024; + static final int MIN_OVERFLOW_MESSAGE_LENGTH = Records.LOG_OVERHEAD; private RecordConversionStats recordConversionStats; private RecordsSend convertedRecordsWriter; @@ -49,39 +49,31 @@ public final class LazyDownConversionRecordsSend extends RecordsSend recordsAndStats = convertedRecordsIterator.next(); convertedRecords = recordsAndStats.records(); - - int sizeOfFirstConvertedBatch = convertedRecords.batchIterator().next().sizeInBytes(); - if (previouslyWritten == 0 && sizeOfFirstConvertedBatch > size()) - throw new EOFException("Unable to send first batch completely." + - " maximum_size: " + size() + - " converted_records_size: " + sizeOfFirstConvertedBatch); - recordConversionStats.add(recordsAndStats.recordConversionStats()); - log.debug("Got lazy converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes()); + log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes()); } else { - if (previouslyWritten == 0) - throw new EOFException("Unable to get the first batch of down-converted records"); - - // We do not have any records left to down-convert. Construct a "fake" message for the length remaining. + // We do not have any records left to down-convert. Construct an overflow message for the length remaining. // This message will be ignored by the consumer because its length will be past the length of maximum // possible response size. // DefaultRecordBatch => // BaseOffset => Int64 // Length => Int32 // ... - log.debug("Constructing fake message batch for partition {} for remaining length={}", topicPartition(), remaining); - ByteBuffer fakeMessageBatch = ByteBuffer.allocate(Math.max(Records.LOG_OVERHEAD, Math.min(remaining + 1, MAX_READ_SIZE))); - fakeMessageBatch.putLong(-1L); - fakeMessageBatch.putInt(remaining + 1); - convertedRecords = MemoryRecords.readableRecords(fakeMessageBatch); - } + ByteBuffer overflowMessageBatch = ByteBuffer.allocate( + Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, MAX_READ_SIZE))); + overflowMessageBatch.putLong(-1L); + // Fill in the length of the overflow batch. A valid batch must be at least as long as the minimum batch + // overhead. + overflowMessageBatch.putInt(Math.max(remaining + 1, DefaultRecordBatch.RECORD_BATCH_OVERHEAD)); + convertedRecords = MemoryRecords.readableRecords(overflowMessageBatch); + log.debug("Constructed overflow message batch for partition {} with length={}", topicPartition(), remaining); + } convertedRecordsWriter = new DefaultRecordsSend(destination(), convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining)); } return convertedRecordsWriter.writeTo(channel); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index ebe87ba3e27..31fa01cfc8f 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -111,6 +111,17 @@ public final class Utils { return utf8(buffer, 0, length); } + /** + * Read a UTF8 string from the current position till the end of a byte buffer. The position of the byte buffer is + * not affected by this method. + * + * @param buffer The buffer to read from + * @return The UTF8 string + */ + public static String utf8(ByteBuffer buffer) { + return utf8(buffer, buffer.remaining()); + } + /** * Read a UTF8 string from a byte buffer at a given offset. Note that the position of the byte buffer * is not affected by this method. diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index bbe84b2937c..f08652e23f9 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; import org.junit.Before; @@ -38,6 +37,7 @@ import java.util.Iterator; import java.util.List; import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.utf8; import static org.apache.kafka.test.TestUtils.tempFile; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -430,10 +430,6 @@ public class FileRecordsTest { } } - private String utf8(ByteBuffer buffer) { - return Utils.utf8(buffer, buffer.remaining()); - } - private void downConvertAndVerifyRecords(List initialRecords, List initialOffsets, FileRecords fileRecords, @@ -441,13 +437,11 @@ public class FileRecordsTest { byte toMagic, long firstOffset, Time time) { - long numBatches = 0; long minBatchSize = Long.MAX_VALUE; long maxBatchSize = Long.MIN_VALUE; for (RecordBatch batch : fileRecords.batches()) { minBatchSize = Math.min(minBatchSize, batch.sizeInBytes()); maxBatchSize = Math.max(maxBatchSize, batch.sizeInBytes()); - numBatches++; } // Test the normal down-conversion path @@ -469,21 +463,6 @@ public class FileRecordsTest { Iterator it = lazyRecords.iterator(readSize); while (it.hasNext()) convertedRecords.add(it.next().records()); - - // Check if chunking works as expected. The only way to predictably test for this is by testing the edge cases. - // 1. If maximum read size is greater than the size of all batches combined, we must get all down-conversion - // records in exactly two batches; the first chunk is pre down-converted and returned, and the second chunk - // contains the remaining batches. - // 2. If maximum read size is just smaller than the size of all batches combined, we must get results in two - // chunks. - // 3. If maximum read size is less than the size of a single record, we get one batch in each chunk. - if (readSize >= fileRecords.sizeInBytes()) - assertEquals(2, convertedRecords.size()); - else if (readSize == fileRecords.sizeInBytes() - 1) - assertEquals(2, convertedRecords.size()); - else if (readSize <= minBatchSize) - assertEquals(numBatches, convertedRecords.size()); - verifyConvertedRecords(initialRecords, initialOffsets, convertedRecords, compressionType, toMagic); convertedRecords.clear(); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java index 87656038f9b..89c1aeac4c2 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -35,86 +34,126 @@ import java.util.Collection; import java.util.List; import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.utf8; import static org.apache.kafka.test.TestUtils.tempFile; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -@RunWith(value = Parameterized.class) public class LazyDownConversionRecordsTest { - private final CompressionType compressionType; - private final byte toMagic; - private final DownConversionTest test; - - public LazyDownConversionRecordsTest(CompressionType compressionType, byte toMagic, DownConversionTest test) { - this.compressionType = compressionType; - this.toMagic = toMagic; - this.test = test; - } - - enum DownConversionTest { - DEFAULT, - OVERFLOW, - } - - @Parameterized.Parameters(name = "compressionType={0}, toMagic={1}, test={2}") - public static Collection data() { - List values = new ArrayList<>(); - for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) { - for (DownConversionTest test : DownConversionTest.values()) { - values.add(new Object[]{CompressionType.NONE, toMagic, test}); - values.add(new Object[]{CompressionType.GZIP, toMagic, test}); - } - } - return values; - } - + /** + * Test the lazy down-conversion path in the presence of commit markers. When converting to V0 or V1, these batches + * are dropped. If there happen to be no more batches left to convert, we must get an overflow message batch after + * conversion. + */ @Test - public void doTestConversion() throws IOException { - List offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L); + public void testConversionOfCommitMarker() throws IOException { + MemoryRecords recordsToConvert = MemoryRecords.withEndTransactionMarker(0, Time.SYSTEM.milliseconds(), RecordBatch.NO_PARTITION_LEADER_EPOCH, + 1, (short) 1, new EndTransactionMarker(ControlRecordType.COMMIT, 0)); + MemoryRecords convertedRecords = convertRecords(recordsToConvert, (byte) 1, recordsToConvert.sizeInBytes()); + ByteBuffer buffer = convertedRecords.buffer(); - Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()), - new RecordHeader("headerKey2", "headerValue2".getBytes()), - new RecordHeader("headerKey3", "headerValue3".getBytes())}; + // read the offset and the batch length + buffer.getLong(); + int sizeOfConvertedRecords = buffer.getInt(); - List records = asList( - new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()), - new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()), - new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()), - new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()), - new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()), - new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()), - new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()), - new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers), - new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()), - new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers)); - assertEquals("incorrect test setup", offsets.size(), records.size()); + // assert we got an overflow message batch + assertTrue(sizeOfConvertedRecords > buffer.limit()); + assertFalse(convertedRecords.batchIterator().hasNext()); + } - ByteBuffer buffer = ByteBuffer.allocate(1024); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, - TimestampType.CREATE_TIME, 0L); - for (int i = 0; i < 3; i++) - builder.appendWithOffset(offsets.get(i), records.get(i)); - builder.close(); + @RunWith(value = Parameterized.class) + public static class ParameterizedConversionTest { + private final CompressionType compressionType; + private final byte toMagic; - builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L); - for (int i = 3; i < 6; i++) - builder.appendWithOffset(offsets.get(i), records.get(i)); - builder.close(); + public ParameterizedConversionTest(CompressionType compressionType, byte toMagic) { + this.compressionType = compressionType; + this.toMagic = toMagic; + } - builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, - 0L); - for (int i = 6; i < 10; i++) - builder.appendWithOffset(offsets.get(i), records.get(i)); - builder.close(); + @Parameterized.Parameters(name = "compressionType={0}, toMagic={1}") + public static Collection data() { + List values = new ArrayList<>(); + for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) { + values.add(new Object[]{CompressionType.NONE, toMagic}); + values.add(new Object[]{CompressionType.GZIP, toMagic}); + } + return values; + } - buffer.flip(); + /** + * Test the lazy down-conversion path. + */ + @Test + public void testConversion() throws IOException { + doTestConversion(false); + } + /** + * Test the lazy down-conversion path where the number of bytes we want to convert is much larger than the + * number of bytes we get after conversion. This causes overflow message batch(es) to be appended towards the + * end of the converted output. + */ + @Test + public void testConversionWithOverflow() throws IOException { + doTestConversion(true); + } + + private void doTestConversion(boolean testConversionOverflow) throws IOException { + List offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L); + + Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()), + new RecordHeader("headerKey2", "headerValue2".getBytes()), + new RecordHeader("headerKey3", "headerValue3".getBytes())}; + + List records = asList( + new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()), + new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()), + new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()), + new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()), + new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()), + new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()), + new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()), + new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers), + new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()), + new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers)); + assertEquals("incorrect test setup", offsets.size(), records.size()); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, + TimestampType.CREATE_TIME, 0L); + for (int i = 0; i < 3; i++) + builder.appendWithOffset(offsets.get(i), records.get(i)); + builder.close(); + + builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, + 0L); + for (int i = 3; i < 6; i++) + builder.appendWithOffset(offsets.get(i), records.get(i)); + builder.close(); + + builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, + 0L); + for (int i = 6; i < 10; i++) + builder.appendWithOffset(offsets.get(i), records.get(i)); + builder.close(); + buffer.flip(); + + MemoryRecords recordsToConvert = MemoryRecords.readableRecords(buffer); + int numBytesToConvert = recordsToConvert.sizeInBytes(); + if (testConversionOverflow) + numBytesToConvert *= 2; + + MemoryRecords convertedRecords = convertRecords(recordsToConvert, toMagic, numBytesToConvert); + verifyDownConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic); + } + } + + private static MemoryRecords convertRecords(MemoryRecords recordsToConvert, byte toMagic, int bytesToConvert) throws IOException { try (FileRecords inputRecords = FileRecords.open(tempFile())) { - MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer); - inputRecords.append(memoryRecords); + inputRecords.append(recordsToConvert); inputRecords.flush(); LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(new TopicPartition("test", 1), @@ -123,50 +162,27 @@ public class LazyDownConversionRecordsTest { File outputFile = tempFile(); FileChannel channel = new RandomAccessFile(outputFile, "rw").getChannel(); - // Size of lazy records is at least as much as the size of underlying records - assertTrue(lazyRecords.sizeInBytes() >= inputRecords.sizeInBytes()); - - int toWrite; int written = 0; - List recordsBeingConverted; - List offsetsOfRecords; - switch (test) { - case DEFAULT: - toWrite = inputRecords.sizeInBytes(); - recordsBeingConverted = records; - offsetsOfRecords = offsets; - break; - case OVERFLOW: - toWrite = inputRecords.sizeInBytes() * 2; - recordsBeingConverted = records; - offsetsOfRecords = offsets; - break; - default: - throw new IllegalArgumentException(); - } - while (written < toWrite) - written += lazySend.writeTo(channel, written, toWrite - written); + while (written < bytesToConvert) + written += lazySend.writeTo(channel, written, bytesToConvert - written); FileRecords convertedRecords = FileRecords.open(outputFile, true, (int) channel.size(), false); ByteBuffer convertedRecordsBuffer = ByteBuffer.allocate(convertedRecords.sizeInBytes()); convertedRecords.readInto(convertedRecordsBuffer, 0); - MemoryRecords convertedMemoryRecords = MemoryRecords.readableRecords(convertedRecordsBuffer); - verifyDownConvertedRecords(recordsBeingConverted, offsetsOfRecords, convertedMemoryRecords, compressionType, toMagic); + // cleanup convertedRecords.close(); channel.close(); + + return MemoryRecords.readableRecords(convertedRecordsBuffer); } } - private String utf8(ByteBuffer buffer) { - return Utils.utf8(buffer, buffer.remaining()); - } - - private void verifyDownConvertedRecords(List initialRecords, - List initialOffsets, - MemoryRecords downConvertedRecords, - CompressionType compressionType, - byte toMagic) { + private static void verifyDownConvertedRecords(List initialRecords, + List initialOffsets, + MemoryRecords downConvertedRecords, + CompressionType compressionType, + byte toMagic) { int i = 0; for (RecordBatch batch : downConvertedRecords.batches()) { assertTrue("Magic byte should be lower than or equal to " + toMagic, batch.magic() <= toMagic); diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 36b14a2f40d..5d5221eccea 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.List; import java.util.Random; +import static org.apache.kafka.common.utils.Utils.utf8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -568,10 +569,6 @@ public class MemoryRecordsBuilderTest { } } - private String utf8(ByteBuffer buffer) { - return Utils.utf8(buffer, buffer.remaining()); - } - @Test public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exception { ByteBuffer buffer = ByteBuffer.allocate(128); diff --git a/docs/upgrade.html b/docs/upgrade.html index 6119536198a..89c90d19d26 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -119,6 +119,14 @@
  • KIP-290 adds the ability to define ACLs on prefixed resources, e.g. any topic starting with 'foo'.
  • +
  • KIP-283 improves message down-conversion + handling on Kafka broker, which has typically been a memory-intensive operation. The KIP adds a mechanism by which the operation becomes less memory intensive + by down-converting chunks of partition data at a time which helps put an upper bound on memory consumption. With this improvement, there is a change in + FetchResponse protocol behavior where the broker could send an oversized message batch towards the end of the response with an invalid offset. + Such oversized messages must be ignored by consumer clients, as is done by KafkaConsumer. +

    KIP-283 also adds new topic and broker configurations message.downconversion.enable and log.message.downconversion.enable respectively + to control whether down-conversion is enabled. When disabled, broker does not perform any down-conversion and instead sends an UNSUPPORTED_VERSION + error to the client.

  • New Protocol Versions