MINOR: Fix chunked down-conversion behavior when no valid batch exists after conversion (#5173)

We might decide to drop certain message batches during down-conversion because older clients might not be able to interpret them. One such example is control batches which are typically removed by the broker if down-conversion to V0 or V1 is required. This patch makes sure the chunked down-conversion implementation is able to handle such cases.
This commit is contained in:
Dhruvil Shah 2018-06-14 23:00:33 -07:00 committed by Jason Gustafson
parent 339fc2379d
commit a8c17e36c3
7 changed files with 172 additions and 161 deletions

View File

@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Time;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -57,13 +56,15 @@ public class LazyDownConversionRecords implements BaseRecords {
// need to make sure that we are able to accommodate one full batch of down-converted messages. The way we achieve
// this is by having sizeInBytes method factor in the size of the first down-converted batch and return at least
// its size.
AbstractIterator<? extends RecordBatch> it = records.batchIterator();
java.util.Iterator<ConvertedRecords> it = iterator(0);
if (it.hasNext()) {
firstConvertedBatch = RecordsUtil.downConvert(Collections.singletonList(it.peek()), toMagic, firstOffset, time);
firstConvertedBatch = it.next();
sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes());
} else {
// If there are no messages we got after down-conversion, make sure we are able to send at least an overflow
// message to the consumer. Typically, the consumer would need to increase the fetch size in such cases.
firstConvertedBatch = null;
sizeInBytes = 0;
sizeInBytes = LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH;
}
}
@ -148,21 +149,28 @@ public class LazyDownConversionRecords implements BaseRecords {
return convertedBatch;
}
if (!batchIterator.hasNext())
return allDone();
while (batchIterator.hasNext()) {
List<RecordBatch> batches = new ArrayList<>();
boolean isFirstBatch = true;
long sizeSoFar = 0;
// Figure out batches we should down-convert based on the size constraints
List<RecordBatch> batches = new ArrayList<>();
boolean isFirstBatch = true;
long sizeSoFar = 0;
while (batchIterator.hasNext() &&
(isFirstBatch || (batchIterator.peek().sizeInBytes() + sizeSoFar) <= maximumReadSize)) {
RecordBatch currentBatch = batchIterator.next();
batches.add(currentBatch);
sizeSoFar += currentBatch.sizeInBytes();
isFirstBatch = false;
// Figure out batches we should down-convert based on the size constraints
while (batchIterator.hasNext() &&
(isFirstBatch || (batchIterator.peek().sizeInBytes() + sizeSoFar) <= maximumReadSize)) {
RecordBatch currentBatch = batchIterator.next();
batches.add(currentBatch);
sizeSoFar += currentBatch.sizeInBytes();
isFirstBatch = false;
}
ConvertedRecords convertedRecords = RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
// During conversion, it is possible that we drop certain batches because they do not have an equivalent
// representation in the message format we want to convert to. For example, V0 and V1 message formats
// have no notion of transaction markers which were introduced in V2 so they get dropped during conversion.
// We return converted records only when we have at least one valid batch of messages after conversion.
if (convertedRecords.records().sizeInBytes() > 0)
return convertedRecords;
}
return RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
return allDone();
}
}
}

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
@ -33,6 +32,7 @@ import java.util.Iterator;
public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownConversionRecords> {
private static final Logger log = LoggerFactory.getLogger(LazyDownConversionRecordsSend.class);
private static final int MAX_READ_SIZE = 128 * 1024;
static final int MIN_OVERFLOW_MESSAGE_LENGTH = Records.LOG_OVERHEAD;
private RecordConversionStats recordConversionStats;
private RecordsSend convertedRecordsWriter;
@ -49,39 +49,31 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
public long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException {
if (convertedRecordsWriter == null || convertedRecordsWriter.completed()) {
MemoryRecords convertedRecords;
// Check if we have more chunks left to down-convert
if (convertedRecordsIterator.hasNext()) {
// Get next chunk of down-converted messages
ConvertedRecords<MemoryRecords> recordsAndStats = convertedRecordsIterator.next();
convertedRecords = recordsAndStats.records();
int sizeOfFirstConvertedBatch = convertedRecords.batchIterator().next().sizeInBytes();
if (previouslyWritten == 0 && sizeOfFirstConvertedBatch > size())
throw new EOFException("Unable to send first batch completely." +
" maximum_size: " + size() +
" converted_records_size: " + sizeOfFirstConvertedBatch);
recordConversionStats.add(recordsAndStats.recordConversionStats());
log.debug("Got lazy converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes());
log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes());
} else {
if (previouslyWritten == 0)
throw new EOFException("Unable to get the first batch of down-converted records");
// We do not have any records left to down-convert. Construct a "fake" message for the length remaining.
// We do not have any records left to down-convert. Construct an overflow message for the length remaining.
// This message will be ignored by the consumer because its length will be past the length of maximum
// possible response size.
// DefaultRecordBatch =>
// BaseOffset => Int64
// Length => Int32
// ...
log.debug("Constructing fake message batch for partition {} for remaining length={}", topicPartition(), remaining);
ByteBuffer fakeMessageBatch = ByteBuffer.allocate(Math.max(Records.LOG_OVERHEAD, Math.min(remaining + 1, MAX_READ_SIZE)));
fakeMessageBatch.putLong(-1L);
fakeMessageBatch.putInt(remaining + 1);
convertedRecords = MemoryRecords.readableRecords(fakeMessageBatch);
}
ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, MAX_READ_SIZE)));
overflowMessageBatch.putLong(-1L);
// Fill in the length of the overflow batch. A valid batch must be at least as long as the minimum batch
// overhead.
overflowMessageBatch.putInt(Math.max(remaining + 1, DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
convertedRecords = MemoryRecords.readableRecords(overflowMessageBatch);
log.debug("Constructed overflow message batch for partition {} with length={}", topicPartition(), remaining);
}
convertedRecordsWriter = new DefaultRecordsSend(destination(), convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining));
}
return convertedRecordsWriter.writeTo(channel);

View File

@ -111,6 +111,17 @@ public final class Utils {
return utf8(buffer, 0, length);
}
/**
* Read a UTF8 string from the current position till the end of a byte buffer. The position of the byte buffer is
* not affected by this method.
*
* @param buffer The buffer to read from
* @return The UTF8 string
*/
public static String utf8(ByteBuffer buffer) {
return utf8(buffer, buffer.remaining());
}
/**
* Read a UTF8 string from a byte buffer at a given offset. Note that the position of the byte buffer
* is not affected by this method.

View File

@ -22,7 +22,6 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Before;
@ -38,6 +37,7 @@ import java.util.Iterator;
import java.util.List;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.utf8;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -430,10 +430,6 @@ public class FileRecordsTest {
}
}
private String utf8(ByteBuffer buffer) {
return Utils.utf8(buffer, buffer.remaining());
}
private void downConvertAndVerifyRecords(List<SimpleRecord> initialRecords,
List<Long> initialOffsets,
FileRecords fileRecords,
@ -441,13 +437,11 @@ public class FileRecordsTest {
byte toMagic,
long firstOffset,
Time time) {
long numBatches = 0;
long minBatchSize = Long.MAX_VALUE;
long maxBatchSize = Long.MIN_VALUE;
for (RecordBatch batch : fileRecords.batches()) {
minBatchSize = Math.min(minBatchSize, batch.sizeInBytes());
maxBatchSize = Math.max(maxBatchSize, batch.sizeInBytes());
numBatches++;
}
// Test the normal down-conversion path
@ -469,21 +463,6 @@ public class FileRecordsTest {
Iterator<ConvertedRecords> it = lazyRecords.iterator(readSize);
while (it.hasNext())
convertedRecords.add(it.next().records());
// Check if chunking works as expected. The only way to predictably test for this is by testing the edge cases.
// 1. If maximum read size is greater than the size of all batches combined, we must get all down-conversion
// records in exactly two batches; the first chunk is pre down-converted and returned, and the second chunk
// contains the remaining batches.
// 2. If maximum read size is just smaller than the size of all batches combined, we must get results in two
// chunks.
// 3. If maximum read size is less than the size of a single record, we get one batch in each chunk.
if (readSize >= fileRecords.sizeInBytes())
assertEquals(2, convertedRecords.size());
else if (readSize == fileRecords.sizeInBytes() - 1)
assertEquals(2, convertedRecords.size());
else if (readSize <= minBatchSize)
assertEquals(numBatches, convertedRecords.size());
verifyConvertedRecords(initialRecords, initialOffsets, convertedRecords, compressionType, toMagic);
convertedRecords.clear();
}

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -35,86 +34,126 @@ import java.util.Collection;
import java.util.List;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.utf8;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(value = Parameterized.class)
public class LazyDownConversionRecordsTest {
private final CompressionType compressionType;
private final byte toMagic;
private final DownConversionTest test;
public LazyDownConversionRecordsTest(CompressionType compressionType, byte toMagic, DownConversionTest test) {
this.compressionType = compressionType;
this.toMagic = toMagic;
this.test = test;
}
enum DownConversionTest {
DEFAULT,
OVERFLOW,
}
@Parameterized.Parameters(name = "compressionType={0}, toMagic={1}, test={2}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
for (DownConversionTest test : DownConversionTest.values()) {
values.add(new Object[]{CompressionType.NONE, toMagic, test});
values.add(new Object[]{CompressionType.GZIP, toMagic, test});
}
}
return values;
}
/**
* Test the lazy down-conversion path in the presence of commit markers. When converting to V0 or V1, these batches
* are dropped. If there happen to be no more batches left to convert, we must get an overflow message batch after
* conversion.
*/
@Test
public void doTestConversion() throws IOException {
List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
public void testConversionOfCommitMarker() throws IOException {
MemoryRecords recordsToConvert = MemoryRecords.withEndTransactionMarker(0, Time.SYSTEM.milliseconds(), RecordBatch.NO_PARTITION_LEADER_EPOCH,
1, (short) 1, new EndTransactionMarker(ControlRecordType.COMMIT, 0));
MemoryRecords convertedRecords = convertRecords(recordsToConvert, (byte) 1, recordsToConvert.sizeInBytes());
ByteBuffer buffer = convertedRecords.buffer();
Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()),
new RecordHeader("headerKey2", "headerValue2".getBytes()),
new RecordHeader("headerKey3", "headerValue3".getBytes())};
// read the offset and the batch length
buffer.getLong();
int sizeOfConvertedRecords = buffer.getInt();
List<SimpleRecord> records = asList(
new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()),
new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers),
new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers));
assertEquals("incorrect test setup", offsets.size(), records.size());
// assert we got an overflow message batch
assertTrue(sizeOfConvertedRecords > buffer.limit());
assertFalse(convertedRecords.batchIterator().hasNext());
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME, 0L);
for (int i = 0; i < 3; i++)
builder.appendWithOffset(offsets.get(i), records.get(i));
builder.close();
@RunWith(value = Parameterized.class)
public static class ParameterizedConversionTest {
private final CompressionType compressionType;
private final byte toMagic;
builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
0L);
for (int i = 3; i < 6; i++)
builder.appendWithOffset(offsets.get(i), records.get(i));
builder.close();
public ParameterizedConversionTest(CompressionType compressionType, byte toMagic) {
this.compressionType = compressionType;
this.toMagic = toMagic;
}
builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
0L);
for (int i = 6; i < 10; i++)
builder.appendWithOffset(offsets.get(i), records.get(i));
builder.close();
@Parameterized.Parameters(name = "compressionType={0}, toMagic={1}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
values.add(new Object[]{CompressionType.NONE, toMagic});
values.add(new Object[]{CompressionType.GZIP, toMagic});
}
return values;
}
buffer.flip();
/**
* Test the lazy down-conversion path.
*/
@Test
public void testConversion() throws IOException {
doTestConversion(false);
}
/**
* Test the lazy down-conversion path where the number of bytes we want to convert is much larger than the
* number of bytes we get after conversion. This causes overflow message batch(es) to be appended towards the
* end of the converted output.
*/
@Test
public void testConversionWithOverflow() throws IOException {
doTestConversion(true);
}
private void doTestConversion(boolean testConversionOverflow) throws IOException {
List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()),
new RecordHeader("headerKey2", "headerValue2".getBytes()),
new RecordHeader("headerKey3", "headerValue3".getBytes())};
List<SimpleRecord> records = asList(
new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()),
new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers),
new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers));
assertEquals("incorrect test setup", offsets.size(), records.size());
ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME, 0L);
for (int i = 0; i < 3; i++)
builder.appendWithOffset(offsets.get(i), records.get(i));
builder.close();
builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
0L);
for (int i = 3; i < 6; i++)
builder.appendWithOffset(offsets.get(i), records.get(i));
builder.close();
builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
0L);
for (int i = 6; i < 10; i++)
builder.appendWithOffset(offsets.get(i), records.get(i));
builder.close();
buffer.flip();
MemoryRecords recordsToConvert = MemoryRecords.readableRecords(buffer);
int numBytesToConvert = recordsToConvert.sizeInBytes();
if (testConversionOverflow)
numBytesToConvert *= 2;
MemoryRecords convertedRecords = convertRecords(recordsToConvert, toMagic, numBytesToConvert);
verifyDownConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
}
}
private static MemoryRecords convertRecords(MemoryRecords recordsToConvert, byte toMagic, int bytesToConvert) throws IOException {
try (FileRecords inputRecords = FileRecords.open(tempFile())) {
MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
inputRecords.append(memoryRecords);
inputRecords.append(recordsToConvert);
inputRecords.flush();
LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(new TopicPartition("test", 1),
@ -123,50 +162,27 @@ public class LazyDownConversionRecordsTest {
File outputFile = tempFile();
FileChannel channel = new RandomAccessFile(outputFile, "rw").getChannel();
// Size of lazy records is at least as much as the size of underlying records
assertTrue(lazyRecords.sizeInBytes() >= inputRecords.sizeInBytes());
int toWrite;
int written = 0;
List<SimpleRecord> recordsBeingConverted;
List<Long> offsetsOfRecords;
switch (test) {
case DEFAULT:
toWrite = inputRecords.sizeInBytes();
recordsBeingConverted = records;
offsetsOfRecords = offsets;
break;
case OVERFLOW:
toWrite = inputRecords.sizeInBytes() * 2;
recordsBeingConverted = records;
offsetsOfRecords = offsets;
break;
default:
throw new IllegalArgumentException();
}
while (written < toWrite)
written += lazySend.writeTo(channel, written, toWrite - written);
while (written < bytesToConvert)
written += lazySend.writeTo(channel, written, bytesToConvert - written);
FileRecords convertedRecords = FileRecords.open(outputFile, true, (int) channel.size(), false);
ByteBuffer convertedRecordsBuffer = ByteBuffer.allocate(convertedRecords.sizeInBytes());
convertedRecords.readInto(convertedRecordsBuffer, 0);
MemoryRecords convertedMemoryRecords = MemoryRecords.readableRecords(convertedRecordsBuffer);
verifyDownConvertedRecords(recordsBeingConverted, offsetsOfRecords, convertedMemoryRecords, compressionType, toMagic);
// cleanup
convertedRecords.close();
channel.close();
return MemoryRecords.readableRecords(convertedRecordsBuffer);
}
}
private String utf8(ByteBuffer buffer) {
return Utils.utf8(buffer, buffer.remaining());
}
private void verifyDownConvertedRecords(List<SimpleRecord> initialRecords,
List<Long> initialOffsets,
MemoryRecords downConvertedRecords,
CompressionType compressionType,
byte toMagic) {
private static void verifyDownConvertedRecords(List<SimpleRecord> initialRecords,
List<Long> initialOffsets,
MemoryRecords downConvertedRecords,
CompressionType compressionType,
byte toMagic) {
int i = 0;
for (RecordBatch batch : downConvertedRecords.batches()) {
assertTrue("Magic byte should be lower than or equal to " + toMagic, batch.magic() <= toMagic);

View File

@ -30,6 +30,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Random;
import static org.apache.kafka.common.utils.Utils.utf8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -568,10 +569,6 @@ public class MemoryRecordsBuilderTest {
}
}
private String utf8(ByteBuffer buffer) {
return Utils.utf8(buffer, buffer.remaining());
}
@Test
public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(128);

View File

@ -119,6 +119,14 @@
</li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a> adds the ability
to define ACLs on prefixed resources, e.g. any topic starting with 'foo'.</li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion">KIP-283</a> improves message down-conversion
handling on Kafka broker, which has typically been a memory-intensive operation. The KIP adds a mechanism by which the operation becomes less memory intensive
by down-converting chunks of partition data at a time which helps put an upper bound on memory consumption. With this improvement, there is a change in
<code>FetchResponse</code> protocol behavior where the broker could send an oversized message batch towards the end of the response with an invalid offset.
Such oversized messages must be ignored by consumer clients, as is done by <code>KafkaConsumer</code>.
<p>KIP-283 also adds new topic and broker configurations <code>message.downconversion.enable</code> and <code>log.message.downconversion.enable</code> respectively
to control whether down-conversion is enabled. When disabled, broker does not perform any down-conversion and instead sends an <code>UNSUPPORTED_VERSION</code>
error to the client.</p></li>
</ul>
<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>