KAFKA-8106: Skipping ByteBuffer allocation of key / value / headers in logValidator (#6785)

* KAFKA-8106:Reducing the allocation and copying of ByteBuffer when logValidator do validation.

* KAFKA-8106:Reducing the allocation and copying of ByteBuffer when logValidator do validation.

* github comments

* use batch.skipKeyValueIterator

* cleanups

* no need to skip kv for uncompressed iterator

* checkstyle fixes

* fix findbugs

* adding unit tests

* reuse decompression buffer; and using streaming iterator

* checkstyle

* add unit tests

* remove reusing buffer supplier

* fix unit tests

* add unit tests

* use streaming iterator

* minor refactoring

* rename

* github comments

* github comments

* reuse buffer at DefaultRecord caller

* some further optimization

* major refactoring

* further refactoring

* update comment

* github comments

* minor fix

* add jmh benchmarks

* update jmh

* github comments

* minor fix

* github comments
This commit is contained in:
Guozhang Wang 2019-06-21 12:44:45 -07:00 committed by GitHub
parent 11b25a13ee
commit 3e9d1c1411
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 816 additions and 132 deletions

View File

@ -54,6 +54,7 @@ import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset; import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse; import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.PrimitiveRef;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.ArrayList; import java.util.ArrayList;
@ -68,7 +69,6 @@ import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -655,15 +655,15 @@ public class TransactionManager {
} }
private void startSequencesAtBeginning(TopicPartition topicPartition) { private void startSequencesAtBeginning(TopicPartition topicPartition) {
final AtomicInteger sequence = new AtomicInteger(0); final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
topicPartitionBookkeeper.getPartition(topicPartition).resetSequenceNumbers(inFlightBatch -> { topicPartitionBookkeeper.getPartition(topicPartition).resetSequenceNumbers(inFlightBatch -> {
log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}",
inFlightBatch.baseSequence(), inFlightBatch.topicPartition, sequence.get()); inFlightBatch.baseSequence(), inFlightBatch.topicPartition, sequence.value);
inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(),
inFlightBatch.producerEpoch()), sequence.get(), inFlightBatch.isTransactional()); inFlightBatch.producerEpoch()), sequence.value, inFlightBatch.isTransactional());
sequence.getAndAdd(inFlightBatch.recordCount); sequence.value += inFlightBatch.recordCount;
}); });
setNextSequence(topicPartition, sequence.get()); setNextSequence(topicPartition, sequence.value);
topicPartitionBookkeeper.getPartition(topicPartition).lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER; topicPartitionBookkeeper.getPartition(topicPartition).lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
} }

View File

@ -227,7 +227,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
return iterator(BufferSupplier.NO_CACHING); return iterator(BufferSupplier.NO_CACHING);
} }
private CloseableIterator<Record> iterator(BufferSupplier bufferSupplier) { CloseableIterator<Record> iterator(BufferSupplier bufferSupplier) {
if (isCompressed()) if (isCompressed())
return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier); return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier);
@ -503,6 +503,16 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
ByteUtils.writeUnsignedInt(buffer, LOG_OVERHEAD + LegacyRecord.CRC_OFFSET, crc); ByteUtils.writeUnsignedInt(buffer, LOG_OVERHEAD + LegacyRecord.CRC_OFFSET, crc);
} }
/**
* LegacyRecordBatch does not implement this iterator and would hence fallback to the normal iterator.
*
* @return An iterator over the records contained within this batch
*/
@Override
public CloseableIterator<Record> skipKeyValueIterator(BufferSupplier bufferSupplier) {
return CloseableIterator.wrap(iterator(bufferSupplier));
}
@Override @Override
public void writeTo(ByteBufferOutputStream outputStream) { public void writeTo(ByteBufferOutputStream outputStream) {
outputStream.write(buffer.duplicate()); outputStream.write(buffer.duplicate());

View File

@ -16,8 +16,8 @@
*/ */
package org.apache.kafka.common.record; package org.apache.kafka.common.record;
abstract class AbstractRecordBatch implements RecordBatch {
abstract class AbstractRecordBatch implements RecordBatch {
@Override @Override
public boolean hasProducerId() { public boolean hasProducerId() {
return RecordBatch.NO_PRODUCER_ID < producerId(); return RecordBatch.NO_PRODUCER_ID < producerId();

View File

@ -43,13 +43,13 @@ public abstract class AbstractRecords implements Records {
return true; return true;
} }
public boolean firstBatchHasCompatibleMagic(byte magic) { public RecordBatch firstBatch() {
Iterator<? extends RecordBatch> iterator = batches().iterator(); Iterator<? extends RecordBatch> iterator = batches().iterator();
if (!iterator.hasNext()) if (!iterator.hasNext())
return true; return null;
return iterator.next().magic() <= magic; return iterator.next();
} }
/** /**

View File

@ -21,6 +21,8 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Checksums; import org.apache.kafka.common.utils.Checksums;
import org.apache.kafka.common.utils.Crc32C; import org.apache.kafka.common.utils.Crc32C;
import org.apache.kafka.common.utils.PrimitiveRef;
import org.apache.kafka.common.utils.PrimitiveRef.IntRef;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import java.io.DataInput; import java.io.DataInput;
@ -78,14 +80,14 @@ public class DefaultRecord implements Record {
private final ByteBuffer value; private final ByteBuffer value;
private final Header[] headers; private final Header[] headers;
private DefaultRecord(int sizeInBytes, DefaultRecord(int sizeInBytes,
byte attributes, byte attributes,
long offset, long offset,
long timestamp, long timestamp,
int sequence, int sequence,
ByteBuffer key, ByteBuffer key,
ByteBuffer value, ByteBuffer value,
Header[] headers) { Header[] headers) {
this.sizeInBytes = sizeInBytes; this.sizeInBytes = sizeInBytes;
this.attributes = attributes; this.attributes = attributes;
this.offset = offset; this.offset = offset;
@ -370,6 +372,161 @@ public class DefaultRecord implements Record {
} }
} }
public static PartialDefaultRecord readPartiallyFrom(DataInput input,
byte[] skipArray,
long baseOffset,
long baseTimestamp,
int baseSequence,
Long logAppendTime) throws IOException {
int sizeOfBodyInBytes = ByteUtils.readVarint(input);
int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
return readPartiallyFrom(input, skipArray, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
baseSequence, logAppendTime);
}
private static PartialDefaultRecord readPartiallyFrom(DataInput input,
byte[] skipArray,
int sizeInBytes,
int sizeOfBodyInBytes,
long baseOffset,
long baseTimestamp,
int baseSequence,
Long logAppendTime) throws IOException {
ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray);
// set its limit to 0 to indicate no bytes readable yet
skipBuffer.limit(0);
try {
// reading the attributes / timestamp / offset and key-size does not require
// any byte array allocation and therefore we can just read them straight-forwardly
IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes);
byte attributes = readByte(skipBuffer, input, bytesRemaining);
long timestampDelta = readVarLong(skipBuffer, input, bytesRemaining);
long timestamp = baseTimestamp + timestampDelta;
if (logAppendTime != null)
timestamp = logAppendTime;
int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining);
long offset = baseOffset + offsetDelta;
int sequence = baseSequence >= 0 ?
DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
RecordBatch.NO_SEQUENCE;
// first skip key
int keySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
// then skip value
int valueSize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
// then skip header
int numHeaders = readVarInt(skipBuffer, input, bytesRemaining);
if (numHeaders < 0)
throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
for (int i = 0; i < numHeaders; i++) {
int headerKeySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
if (headerKeySize < 0)
throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
// headerValueSize
skipLengthDelimitedField(skipBuffer, input, bytesRemaining);
}
if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0)
throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes +
" bytes in record payload, but there are still bytes remaining");
return new PartialDefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, keySize, valueSize);
} catch (BufferUnderflowException | IllegalArgumentException e) {
throw new InvalidRecordException("Found invalid record structure", e);
}
}
private static byte readByte(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
if (buffer.remaining() < 1 && bytesRemaining.value > 0) {
readMore(buffer, input, bytesRemaining);
}
return buffer.get();
}
private static long readVarLong(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
if (buffer.remaining() < 10 && bytesRemaining.value > 0) {
readMore(buffer, input, bytesRemaining);
}
return ByteUtils.readVarlong(buffer);
}
private static int readVarInt(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
readMore(buffer, input, bytesRemaining);
}
return ByteUtils.readVarint(buffer);
}
private static int skipLengthDelimitedField(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
boolean needMore = false;
int sizeInBytes = -1;
int bytesToSkip = -1;
while (true) {
if (needMore) {
readMore(buffer, input, bytesRemaining);
needMore = false;
}
if (bytesToSkip < 0) {
if (buffer.remaining() < 5 && bytesRemaining.value > 0) {
needMore = true;
} else {
sizeInBytes = ByteUtils.readVarint(buffer);
if (sizeInBytes <= 0)
return sizeInBytes;
else
bytesToSkip = sizeInBytes;
}
} else {
if (bytesToSkip > buffer.remaining()) {
bytesToSkip -= buffer.remaining();
buffer.position(buffer.limit());
needMore = true;
} else {
buffer.position(buffer.position() + bytesToSkip);
return sizeInBytes;
}
}
}
}
private static void readMore(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException {
if (bytesRemaining.value > 0) {
byte[] array = buffer.array();
// first copy the remaining bytes to the beginning of the array;
// at most 4 bytes would be shifted here
int stepsToLeftShift = buffer.position();
int bytesToLeftShift = buffer.remaining();
for (int i = 0; i < bytesToLeftShift; i++) {
array[i] = array[i + stepsToLeftShift];
}
// then try to read more bytes to the remaining of the array
int bytesRead = Math.min(bytesRemaining.value, array.length - bytesToLeftShift);
input.readFully(array, bytesToLeftShift, bytesRead);
buffer.rewind();
// only those many bytes are readable
buffer.limit(bytesToLeftShift + bytesRead);
bytesRemaining.value -= bytesRead;
} else {
throw new InvalidRecordException("Invalid record size: expected to read more bytes in record payload");
}
}
private static Header[] readHeaders(ByteBuffer buffer, int numHeaders) { private static Header[] readHeaders(ByteBuffer buffer, int numHeaders) {
Header[] headers = new Header[numHeaders]; Header[] headers = new Header[numHeaders];
for (int i = 0; i < numHeaders; i++) { for (int i = 0; i < numHeaders; i++) {

View File

@ -128,6 +128,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private static final int CONTROL_FLAG_MASK = 0x20; private static final int CONTROL_FLAG_MASK = 0x20;
private static final byte TIMESTAMP_TYPE_MASK = 0x08; private static final byte TIMESTAMP_TYPE_MASK = 0x08;
private static final int MAX_SKIP_BUFFER_SIZE = 2048;
private final ByteBuffer buffer; private final ByteBuffer buffer;
DefaultRecordBatch(ByteBuffer buffer) { DefaultRecordBatch(ByteBuffer buffer) {
@ -251,42 +253,30 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET); return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
} }
private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier) { private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
final ByteBuffer buffer = this.buffer.duplicate(); final ByteBuffer buffer = this.buffer.duplicate();
buffer.position(RECORDS_OFFSET); buffer.position(RECORDS_OFFSET);
final DataInputStream inputStream = new DataInputStream(compressionType().wrapForInput(buffer, magic(), final DataInputStream inputStream = new DataInputStream(compressionType().wrapForInput(buffer, magic(),
bufferSupplier)); bufferSupplier));
return new RecordIterator() { if (skipKeyValue) {
@Override // this buffer is used to skip length delimited fields like key, value, headers
protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) { byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
try {
return new StreamRecordIterator(inputStream) {
@Override
protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException {
return DefaultRecord.readPartiallyFrom(inputStream, skipArray, baseOffset, firstTimestamp, baseSequence, logAppendTime);
}
};
} else {
return new StreamRecordIterator(inputStream) {
@Override
protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException {
return DefaultRecord.readFrom(inputStream, baseOffset, firstTimestamp, baseSequence, logAppendTime); return DefaultRecord.readFrom(inputStream, baseOffset, firstTimestamp, baseSequence, logAppendTime);
} catch (EOFException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
} catch (IOException e) {
throw new KafkaException("Failed to decompress record stream", e);
} }
} };
}
@Override
protected boolean ensureNoneRemaining() {
try {
return inputStream.read() == -1;
} catch (IOException e) {
throw new KafkaException("Error checking for remaining bytes after reading batch", e);
}
}
@Override
public void close() {
try {
inputStream.close();
} catch (IOException e) {
throw new KafkaException("Failed to close record stream", e);
}
}
};
} }
private CloseableIterator<Record> uncompressedIterator() { private CloseableIterator<Record> uncompressedIterator() {
@ -321,7 +311,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
// for a normal iterator, we cannot ensure that the underlying compression stream is closed, // for a normal iterator, we cannot ensure that the underlying compression stream is closed,
// so we decompress the full record set here. Use cases which call for a lower memory footprint // so we decompress the full record set here. Use cases which call for a lower memory footprint
// can use `streamingIterator` at the cost of additional complexity // can use `streamingIterator` at the cost of additional complexity
try (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING)) { try (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING, false)) {
List<Record> records = new ArrayList<>(count()); List<Record> records = new ArrayList<>(count());
while (iterator.hasNext()) while (iterator.hasNext())
records.add(iterator.next()); records.add(iterator.next());
@ -329,10 +319,29 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
} }
} }
@Override
public CloseableIterator<Record> skipKeyValueIterator(BufferSupplier bufferSupplier) {
if (count() == 0) {
return CloseableIterator.wrap(Collections.emptyIterator());
}
/*
* For uncompressed iterator, it is actually not worth skipping key / value / headers at all since
* its ByteBufferInputStream's skip() function is less efficient compared with just reading it actually
* as it will allocate new byte array.
*/
if (!isCompressed())
return uncompressedIterator();
// we define this to be a closable iterator so that caller (i.e. the log validator) needs to close it
// while we can save memory footprint of not decompressing the full record set ahead of time
return compressedIterator(bufferSupplier, true);
}
@Override @Override
public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) { public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
if (isCompressed()) if (isCompressed())
return compressedIterator(bufferSupplier); return compressedIterator(bufferSupplier, false);
else else
return uncompressedIterator(); return uncompressedIterator();
} }
@ -543,7 +552,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private final int numRecords; private final int numRecords;
private int readRecords = 0; private int readRecords = 0;
public RecordIterator() { RecordIterator() {
this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null; this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null;
this.baseOffset = baseOffset(); this.baseOffset = baseOffset();
this.firstTimestamp = firstTimestamp(); this.firstTimestamp = firstTimestamp();
@ -588,6 +597,46 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
} }
private abstract class StreamRecordIterator extends RecordIterator {
private final DataInputStream inputStream;
StreamRecordIterator(DataInputStream inputStream) {
super();
this.inputStream = inputStream;
}
abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException;
@Override
protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) {
try {
return doReadRecord(baseOffset, firstTimestamp, baseSequence, logAppendTime);
} catch (EOFException e) {
throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached");
} catch (IOException e) {
throw new KafkaException("Failed to decompress record stream", e);
}
}
@Override
protected boolean ensureNoneRemaining() {
try {
return inputStream.read() == -1;
} catch (IOException e) {
throw new KafkaException("Error checking for remaining bytes after reading batch", e);
}
}
@Override
public void close() {
try {
inputStream.close();
} catch (IOException e) {
throw new KafkaException("Failed to close record stream", e);
}
}
}
static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch { static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {
DefaultFileChannelRecordBatch(long offset, DefaultFileChannelRecordBatch(long offset,

View File

@ -418,7 +418,7 @@ public class MemoryRecordsBuilder implements AutoCloseable {
appendDefaultRecord(offset, timestamp, key, value, headers); appendDefaultRecord(offset, timestamp, key, value, headers);
return null; return null;
} else { } else {
return appendLegacyRecord(offset, timestamp, key, value); return appendLegacyRecord(offset, timestamp, key, value, magic);
} }
} catch (IOException e) { } catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e); throw new KafkaException("I/O exception when writing to the append stream, closing", e);
@ -632,7 +632,7 @@ public class MemoryRecordsBuilder implements AutoCloseable {
recordWritten(offset, timestamp, sizeInBytes); recordWritten(offset, timestamp, sizeInBytes);
} }
private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException { private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, byte magic) throws IOException {
ensureOpenForRecordAppend(); ensureOpenForRecordAppend();
if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME) if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME)
timestamp = logAppendTime; timestamp = logAppendTime;

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.record; package org.apache.kafka.common.record;
import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;
/** /**
* A mutable record batch is one that can be modified in place (without copying). This is used by the broker * A mutable record batch is one that can be modified in place (without copying). This is used by the broker
@ -55,4 +56,12 @@ public interface MutableRecordBatch extends RecordBatch {
*/ */
void writeTo(ByteBufferOutputStream outputStream); void writeTo(ByteBufferOutputStream outputStream);
/**
* Return an iterator which skips parsing key, value and headers from the record stream, and therefore the resulted
* {@code org.apache.kafka.common.record.Record}'s key and value fields would be empty. This iterator is used
* when the read record's key and value are not needed and hence can save some byte buffer allocating / GC overhead.
*
* @return The closeable iterator
*/
CloseableIterator<Record> skipKeyValueIterator(BufferSupplier bufferSupplier);
} }

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.header.Header;
import java.nio.ByteBuffer;
public class PartialDefaultRecord extends DefaultRecord {
private final int keySize;
private final int valueSize;
PartialDefaultRecord(int sizeInBytes,
byte attributes,
long offset,
long timestamp,
int sequence,
int keySize,
int valueSize) {
super(sizeInBytes, attributes, offset, timestamp, sequence, null, null, null);
this.keySize = keySize;
this.valueSize = valueSize;
}
@Override
public boolean equals(Object o) {
return super.equals(o) &&
this.keySize == ((PartialDefaultRecord) o).keySize &&
this.valueSize == ((PartialDefaultRecord) o).valueSize;
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + keySize;
result = 31 * result + valueSize;
return result;
}
@Override
public String toString() {
return String.format("PartialDefaultRecord(offset=%d, timestamp=%d, key=%d bytes, value=%d bytes)",
offset(),
timestamp(),
keySize,
valueSize);
}
@Override
public int keySize() {
return keySize;
}
@Override
public boolean hasKey() {
return keySize >= 0;
}
@Override
public ByteBuffer key() {
throw new UnsupportedOperationException("key is skipped in PartialDefaultRecord");
}
@Override
public int valueSize() {
return valueSize;
}
@Override
public boolean hasValue() {
return valueSize >= 0;
}
@Override
public ByteBuffer value() {
throw new UnsupportedOperationException("value is skipped in PartialDefaultRecord");
}
@Override
public Header[] headers() {
throw new UnsupportedOperationException("headers is skipped in PartialDefaultRecord");
}
}

View File

@ -27,4 +27,26 @@ import java.util.Iterator;
*/ */
public interface CloseableIterator<T> extends Iterator<T>, Closeable { public interface CloseableIterator<T> extends Iterator<T>, Closeable {
void close(); void close();
static <R> CloseableIterator<R> wrap(Iterator<R> inner) {
return new CloseableIterator<R>() {
@Override
public void close() {}
@Override
public boolean hasNext() {
return inner.hasNext();
}
@Override
public R next() {
return inner.next();
}
@Override
public void remove() {
inner.remove();
}
};
}
} }

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;
/**
* Primitive reference used to pass primitive typed values as parameter-by-reference.
*
* This is cheaper than using Atomic references.
*/
public class PrimitiveRef {
public static IntRef ofInt(int value) {
return new IntRef(value);
}
public static class IntRef {
public int value;
IntRef(int value) {
this.value = value;
}
}
}

View File

@ -375,6 +375,32 @@ public class DefaultRecordBatchTest {
} }
} }
@Test
public void testSkipKeyValueIteratorCorrectness() {
Header[] headers = {new RecordHeader("k1", "v1".getBytes()), new RecordHeader("k2", "v2".getBytes())};
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
CompressionType.LZ4, TimestampType.CREATE_TIME,
new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
new SimpleRecord(3L, "c".getBytes(), "3".getBytes()),
new SimpleRecord(1000L, "abc".getBytes(), "0".getBytes()),
new SimpleRecord(9999L, "abc".getBytes(), "0".getBytes(), headers)
);
DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
try (CloseableIterator<Record> streamingIterator = batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)) {
assertEquals(Arrays.asList(
new PartialDefaultRecord(9, (byte) 0, 0L, 1L, -1, 1, 1),
new PartialDefaultRecord(9, (byte) 0, 1L, 2L, -1, 1, 1),
new PartialDefaultRecord(9, (byte) 0, 2L, 3L, -1, 1, 1),
new PartialDefaultRecord(12, (byte) 0, 3L, 1000L, -1, 3, 1),
new PartialDefaultRecord(25, (byte) 0, 4L, 9999L, -1, 3, 1)
),
Utils.toList(streamingIterator)
);
}
}
@Test @Test
public void testIncrementSequence() { public void testIncrementSequence() {
assertEquals(10, DefaultRecordBatch.incrementSequence(5, 5)); assertEquals(10, DefaultRecordBatch.incrementSequence(5, 5));

View File

@ -18,10 +18,13 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.ByteUtils;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -32,6 +35,13 @@ import static org.junit.Assert.assertNotNull;
public class DefaultRecordTest { public class DefaultRecordTest {
private byte[] skipArray;
@Before
public void setUp() {
skipArray = new byte[64];
}
@Test @Test
public void testBasicSerde() throws IOException { public void testBasicSerde() throws IOException {
Header[] headers = new Header[] { Header[] headers = new Header[] {
@ -152,6 +162,27 @@ public class DefaultRecordTest {
DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
} }
@Test(expected = InvalidRecordException.class)
public void testInvalidKeySizePartial() throws IOException {
byte attributes = 0;
long timestampDelta = 2;
int offsetDelta = 1;
int sizeOfBodyInBytes = 100;
int keySize = 105; // use a key size larger than the full message
ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
buf.put(attributes);
ByteUtils.writeVarlong(timestampDelta, buf);
ByteUtils.writeVarint(offsetDelta, buf);
ByteUtils.writeVarint(keySize, buf);
buf.position(buf.limit());
buf.flip();
DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf));
DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
}
@Test(expected = InvalidRecordException.class) @Test(expected = InvalidRecordException.class)
public void testInvalidValueSize() throws IOException { public void testInvalidValueSize() throws IOException {
byte attributes = 0; byte attributes = 0;
@ -173,6 +204,210 @@ public class DefaultRecordTest {
DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
} }
@Test(expected = InvalidRecordException.class)
public void testInvalidValueSizePartial() throws IOException {
byte attributes = 0;
long timestampDelta = 2;
int offsetDelta = 1;
int sizeOfBodyInBytes = 100;
int valueSize = 105; // use a value size larger than the full message
ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
buf.put(attributes);
ByteUtils.writeVarlong(timestampDelta, buf);
ByteUtils.writeVarint(offsetDelta, buf);
ByteUtils.writeVarint(-1, buf); // null key
ByteUtils.writeVarint(valueSize, buf);
buf.position(buf.limit());
buf.flip();
DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf));
DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
}
@Test(expected = InvalidRecordException.class)
public void testInvalidNumHeaders() throws IOException {
byte attributes = 0;
long timestampDelta = 2;
int offsetDelta = 1;
int sizeOfBodyInBytes = 100;
ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
buf.put(attributes);
ByteUtils.writeVarlong(timestampDelta, buf);
ByteUtils.writeVarint(offsetDelta, buf);
ByteUtils.writeVarint(-1, buf); // null key
ByteUtils.writeVarint(-1, buf); // null value
ByteUtils.writeVarint(-1, buf); // -1 num.headers, not allowed
buf.position(buf.limit());
buf.flip();
DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
}
@Test(expected = InvalidRecordException.class)
public void testInvalidNumHeadersPartial() throws IOException {
byte attributes = 0;
long timestampDelta = 2;
int offsetDelta = 1;
int sizeOfBodyInBytes = 100;
ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
buf.put(attributes);
ByteUtils.writeVarlong(timestampDelta, buf);
ByteUtils.writeVarint(offsetDelta, buf);
ByteUtils.writeVarint(-1, buf); // null key
ByteUtils.writeVarint(-1, buf); // null value
ByteUtils.writeVarint(-1, buf); // -1 num.headers, not allowed
buf.position(buf.limit());
buf.flip();
DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf));
DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
}
@Test(expected = StringIndexOutOfBoundsException.class)
public void testInvalidHeaderKey() {
byte attributes = 0;
long timestampDelta = 2;
int offsetDelta = 1;
int sizeOfBodyInBytes = 100;
ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
buf.put(attributes);
ByteUtils.writeVarlong(timestampDelta, buf);
ByteUtils.writeVarint(offsetDelta, buf);
ByteUtils.writeVarint(-1, buf); // null key
ByteUtils.writeVarint(-1, buf); // null value
ByteUtils.writeVarint(1, buf);
ByteUtils.writeVarint(105, buf); // header key too long
buf.position(buf.limit());
buf.flip();
DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
}
@Test(expected = InvalidRecordException.class)
public void testInvalidHeaderKeyPartial() throws IOException {
byte attributes = 0;
long timestampDelta = 2;
int offsetDelta = 1;
int sizeOfBodyInBytes = 100;
ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
buf.put(attributes);
ByteUtils.writeVarlong(timestampDelta, buf);
ByteUtils.writeVarint(offsetDelta, buf);
ByteUtils.writeVarint(-1, buf); // null key
ByteUtils.writeVarint(-1, buf); // null value
ByteUtils.writeVarint(1, buf);
ByteUtils.writeVarint(105, buf); // header key too long
buf.position(buf.limit());
buf.flip();
DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf));
DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
}
@Test(expected = InvalidRecordException.class)
public void testNullHeaderKey() {
byte attributes = 0;
long timestampDelta = 2;
int offsetDelta = 1;
int sizeOfBodyInBytes = 100;
ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
buf.put(attributes);
ByteUtils.writeVarlong(timestampDelta, buf);
ByteUtils.writeVarint(offsetDelta, buf);
ByteUtils.writeVarint(-1, buf); // null key
ByteUtils.writeVarint(-1, buf); // null value
ByteUtils.writeVarint(1, buf);
ByteUtils.writeVarint(-1, buf); // null header key not allowed
buf.position(buf.limit());
buf.flip();
DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
}
@Test(expected = InvalidRecordException.class)
public void testNullHeaderKeyPartial() throws IOException {
byte attributes = 0;
long timestampDelta = 2;
int offsetDelta = 1;
int sizeOfBodyInBytes = 100;
ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
buf.put(attributes);
ByteUtils.writeVarlong(timestampDelta, buf);
ByteUtils.writeVarint(offsetDelta, buf);
ByteUtils.writeVarint(-1, buf); // null key
ByteUtils.writeVarint(-1, buf); // null value
ByteUtils.writeVarint(1, buf);
ByteUtils.writeVarint(-1, buf); // null header key not allowed
buf.position(buf.limit());
buf.flip();
DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf));
DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
}
@Test(expected = InvalidRecordException.class)
public void testInvalidHeaderValue() {
byte attributes = 0;
long timestampDelta = 2;
int offsetDelta = 1;
int sizeOfBodyInBytes = 100;
ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
buf.put(attributes);
ByteUtils.writeVarlong(timestampDelta, buf);
ByteUtils.writeVarint(offsetDelta, buf);
ByteUtils.writeVarint(-1, buf); // null key
ByteUtils.writeVarint(-1, buf); // null value
ByteUtils.writeVarint(1, buf);
ByteUtils.writeVarint(1, buf);
buf.put((byte) 1);
ByteUtils.writeVarint(105, buf); // header value too long
buf.position(buf.limit());
buf.flip();
DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
}
@Test(expected = InvalidRecordException.class)
public void testInvalidHeaderValuePartial() throws IOException {
byte attributes = 0;
long timestampDelta = 2;
int offsetDelta = 1;
int sizeOfBodyInBytes = 100;
ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
buf.put(attributes);
ByteUtils.writeVarlong(timestampDelta, buf);
ByteUtils.writeVarint(offsetDelta, buf);
ByteUtils.writeVarint(-1, buf); // null key
ByteUtils.writeVarint(-1, buf); // null value
ByteUtils.writeVarint(1, buf);
ByteUtils.writeVarint(1, buf);
buf.put((byte) 1);
ByteUtils.writeVarint(105, buf); // header value too long
buf.position(buf.limit());
buf.flip();
DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf));
DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
}
@Test(expected = InvalidRecordException.class) @Test(expected = InvalidRecordException.class)
public void testUnderflowReadingTimestamp() { public void testUnderflowReadingTimestamp() {
byte attributes = 0; byte attributes = 0;

View File

@ -23,7 +23,7 @@ import kafka.common.LongRef
import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType, BufferSupplier}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import scala.collection.mutable import scala.collection.mutable
@ -74,21 +74,30 @@ private[kafka] object LogValidator extends Logging {
} }
} }
private[kafka] def validateOneBatchRecords(records: MemoryRecords) { private[kafka] def getFirstBatchAndMaybeValidateNoMoreBatches(records: MemoryRecords, sourceCodec: CompressionCodec): RecordBatch = {
val batchIterator = records.batches.iterator val batchIterator = records.batches.iterator
if (!batchIterator.hasNext) { if (!batchIterator.hasNext) {
throw new InvalidRecordException("Compressed outer record has no batches at all") throw new InvalidRecordException("Record batch has no batches at all")
} }
batchIterator.next() val batch = batchIterator.next()
if (batchIterator.hasNext) { // if the format is v2 and beyond, or if the messages are compressed, we should check there's only one batch.
throw new InvalidRecordException("Compressed outer record has more than one batch") if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || sourceCodec != NoCompressionCodec) {
if (batchIterator.hasNext) {
throw new InvalidRecordException("Compressed outer record has more than one batch")
}
} }
batch
} }
private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = { private def validateBatch(firstBatch: RecordBatch, batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = {
// batch magic byte should have the same magic as the first batch
if (firstBatch.magic() != batch.magic())
throw new InvalidRecordException(s"Batch magic ${batch.magic()} is not the same as the first batch'es magic byte ${firstBatch.magic()}")
if (isFromClient) { if (isFromClient) {
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
val countFromOffsets = batch.lastOffset - batch.baseOffset + 1 val countFromOffsets = batch.lastOffset - batch.baseOffset + 1
@ -123,7 +132,7 @@ private[kafka] object LogValidator extends Logging {
private def validateRecord(batch: RecordBatch, record: Record, now: Long, timestampType: TimestampType, private def validateRecord(batch: RecordBatch, record: Record, now: Long, timestampType: TimestampType,
timestampDiffMaxMs: Long, compactedTopic: Boolean): Unit = { timestampDiffMaxMs: Long, compactedTopic: Boolean): Unit = {
if (!record.hasMagic(batch.magic)) if (!record.hasMagic(batch.magic))
throw new InvalidRecordException(s"Log record magic does not match outer magic ${batch.magic}") throw new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic}")
// verify the record-level CRC only if this is one of the deep entries of a compressed message // verify the record-level CRC only if this is one of the deep entries of a compressed message
// set for magic v0 and v1. For non-compressed messages, there is no inner record for magic v0 and v1, // set for magic v0 and v1. For non-compressed messages, there is no inner record for magic v0 and v1,
@ -159,8 +168,10 @@ private[kafka] object LogValidator extends Logging {
val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType, val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch) offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec)
for (batch <- records.batches.asScala) { for (batch <- records.batches.asScala) {
validateBatch(batch, isFromClient, toMagicValue) validateBatch(firstBatch, batch, isFromClient, toMagicValue)
for (record <- batch.asScala) { for (record <- batch.asScala) {
validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
@ -194,13 +205,10 @@ private[kafka] object LogValidator extends Logging {
var offsetOfMaxTimestamp = -1L var offsetOfMaxTimestamp = -1L
val initialOffset = offsetCounter.value val initialOffset = offsetCounter.value
if (!records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec)
// for v2 and beyond, we should check there's only one batch.
validateOneBatchRecords(records)
}
for (batch <- records.batches.asScala) { for (batch <- records.batches.asScala) {
validateBatch(batch, isFromClient, magic) validateBatch(firstBatch, batch, isFromClient, magic)
var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
var offsetOfMaxBatchTimestamp = -1L var offsetOfMaxBatchTimestamp = -1L
@ -251,9 +259,8 @@ private[kafka] object LogValidator extends Logging {
/** /**
* We cannot do in place assignment in one of the following situations: * We cannot do in place assignment in one of the following situations:
* 1. Source and target compression codec are different * 1. Source and target compression codec are different
* 2. When magic value to use is 0 because offsets need to be overwritten * 2. When the target magic is not equal to batches' magic, meaning format conversion is needed.
* 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten. * 3. When the target magic is equal to V0, meaning absolute offsets need to be re-assigned.
* 4. Message format conversion is needed.
*/ */
def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords, def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
offsetCounter: LongRef, offsetCounter: LongRef,
@ -269,8 +276,12 @@ private[kafka] object LogValidator extends Logging {
isFromClient: Boolean, isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = { interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {
// No in place assignment situation 1 and 2 if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0 throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " +
"are not allowed to use ZStandard compression")
// No in place assignment situation 1
var inPlaceAssignment = sourceCodec == targetCodec
var maxTimestamp = RecordBatch.NO_TIMESTAMP var maxTimestamp = RecordBatch.NO_TIMESTAMP
val expectedInnerOffset = new LongRef(0) val expectedInnerOffset = new LongRef(0)
@ -281,43 +292,51 @@ private[kafka] object LogValidator extends Logging {
// Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
// One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually // One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually
// a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records // a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records
if (sourceCodec != NoCompressionCodec || !records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, sourceCodec)
validateOneBatchRecords(records)
} // No in place assignment situation 2 and 3: we only need to check for the first batch because:
// 1. For most cases (compressed records, v2, for example), there's only one batch anyways.
// 2. For cases that there may be multiple batches, all batches' magic should be the same.
if (firstBatch.magic != toMagic || toMagic == RecordBatch.MAGIC_VALUE_V0)
inPlaceAssignment = false
// Do not compress control records unless they are written compressed
if (sourceCodec == NoCompressionCodec && firstBatch.isControlBatch)
inPlaceAssignment = true
val batches = records.batches.asScala val batches = records.batches.asScala
for (batch <- batches) { for (batch <- batches) {
validateBatch(batch, isFromClient, toMagic) validateBatch(firstBatch, batch, isFromClient, toMagic)
uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType()) uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
// Do not compress control records unless they are written compressed // if we are on version 2 and beyond, and we know we are going for in place assignment,
if (sourceCodec == NoCompressionCodec && batch.isControlBatch) // then we can optimize the iterator to skip key / value / headers since they would not be used at all
inPlaceAssignment = true val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2)
batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)
else
batch.streamingIterator(BufferSupplier.NO_CACHING)
for (record <- batch.asScala) { try {
if (sourceCodec != NoCompressionCodec && record.isCompressed) for (record <- batch.asScala) {
throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + if (sourceCodec != NoCompressionCodec && record.isCompressed)
s"compression attribute set: $record") throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0) s"compression attribute set: $record")
throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression") validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
uncompressedSizeInBytes += record.sizeInBytes() uncompressedSizeInBytes += record.sizeInBytes()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
// Check if we need to overwrite offset // inner records offset should always be continuous
// No in place assignment situation 3 val expectedOffset = expectedInnerOffset.getAndIncrement()
if (record.offset != expectedInnerOffset.getAndIncrement()) if (record.offset != expectedOffset)
inPlaceAssignment = false throw new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset")
if (record.timestamp > maxTimestamp) if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp maxTimestamp = record.timestamp
}
validatedRecords += record
} }
} finally {
// No in place assignment situation 4 recordsIterator.close()
if (!record.hasMagic(toMagic))
inPlaceAssignment = false
validatedRecords += record
} }
} }

View File

@ -56,6 +56,12 @@ class LogValidatorTest {
checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, CompressionType.GZIP) checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, CompressionType.GZIP)
} }
@Test
def testMisMatchMagic(): Unit = {
checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP)
checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP)
}
private def checkOnlyOneBatch(magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType) { private def checkOnlyOneBatch(magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType) {
assertThrows[InvalidRecordException] { assertThrows[InvalidRecordException] {
validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType) validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType)
@ -66,6 +72,12 @@ class LogValidatorTest {
validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType) validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType)
} }
private def checkMismatchMagic(batchMagic: Byte, recordMagic: Byte, compressionType: CompressionType): Unit = {
assertThrows[InvalidRecordException] {
validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compressionType), batchMagic, compressionType, compressionType)
}
}
private def validateMessages(records: MemoryRecords, magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType): Unit = { private def validateMessages(records: MemoryRecords, magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType): Unit = {
LogValidator.validateMessagesAndAssignOffsets(records, LogValidator.validateMessagesAndAssignOffsets(records,
new LongRef(0L), new LongRef(0L),
@ -1094,25 +1106,6 @@ class LogValidatorTest {
interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset)
} }
@Test(expected = classOf[InvalidRecordException])
def testInvalidInnerMagicVersion(): Unit = {
val offset = 1234567
val records = recordsWithInvalidInnerMagic(offset)
LogValidator.validateMessagesAndAssignOffsets(records,
offsetCounter = new LongRef(offset),
time = time,
now = System.currentTimeMillis(),
sourceCodec = SnappyCompressionCodec,
targetCodec = SnappyCompressionCodec,
compactedTopic = false,
magic = RecordBatch.MAGIC_VALUE_V1,
timestampType = TimestampType.CREATE_TIME,
timestampDiffMaxMs = 5000L,
partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
isFromClient = true,
interBrokerProtocolVersion = ApiVersion.latestVersion)
}
@Test(expected = classOf[InvalidRecordException]) @Test(expected = classOf[InvalidRecordException])
def testCompressedBatchWithoutRecordsNotAllowed(): Unit = { def testCompressedBatchWithoutRecordsNotAllowed(): Unit = {
testBatchWithoutRecordsNotAllowed(DefaultCompressionCodec, DefaultCompressionCodec) testBatchWithoutRecordsNotAllowed(DefaultCompressionCodec, DefaultCompressionCodec)
@ -1184,9 +1177,9 @@ class LogValidatorTest {
builder.build() builder.build()
} }
def createTwoBatchedRecords(magicValue: Byte, private def createTwoBatchedRecords(magicValue: Byte,
timestamp: Long = RecordBatch.NO_TIMESTAMP, timestamp: Long = RecordBatch.NO_TIMESTAMP,
codec: CompressionType): MemoryRecords = { codec: CompressionType): MemoryRecords = {
val buf = ByteBuffer.allocate(2048) val buf = ByteBuffer.allocate(2048)
var builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) var builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L)
builder.append(10L, "1".getBytes(), "a".getBytes()) builder.append(10L, "1".getBytes(), "a".getBytes())
@ -1200,6 +1193,16 @@ class LogValidatorTest {
MemoryRecords.readableRecords(buf.slice()) MemoryRecords.readableRecords(buf.slice())
} }
private def createDiscontinuousOffsetRecords(magicValue: Byte,
codec: CompressionType): MemoryRecords = {
val buf = ByteBuffer.allocate(512)
val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L)
builder.appendWithOffset(0, RecordBatch.NO_TIMESTAMP, null, "hello".getBytes)
builder.appendWithOffset(2, RecordBatch.NO_TIMESTAMP, null, "there".getBytes)
builder.appendWithOffset(3, RecordBatch.NO_TIMESTAMP, null, "beautiful".getBytes)
builder.build()
}
/* check that offsets are assigned consecutively from the given base offset */ /* check that offsets are assigned consecutively from the given base offset */
def checkOffsets(records: MemoryRecords, baseOffset: Long) { def checkOffsets(records: MemoryRecords, baseOffset: Long) {
assertTrue("Message set should not be empty", records.records.asScala.nonEmpty) assertTrue("Message set should not be empty", records.records.asScala.nonEmpty)
@ -1210,18 +1213,20 @@ class LogValidatorTest {
} }
} }
private def recordsWithInvalidInnerMagic(initialOffset: Long): MemoryRecords = { private def recordsWithInvalidInnerMagic(batchMagicValue: Byte,
recordMagicValue: Byte,
codec: CompressionType): MemoryRecords = {
val records = (0 until 20).map(id => val records = (0 until 20).map(id =>
LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, LegacyRecord.create(recordMagicValue,
RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP,
id.toString.getBytes, id.toString.getBytes,
id.toString.getBytes)) id.toString.getBytes))
val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16)) val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16))
val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP, val builder = MemoryRecords.builder(buffer, batchMagicValue, codec,
TimestampType.CREATE_TIME, 0L) TimestampType.CREATE_TIME, 0L)
var offset = initialOffset var offset = 1234567
records.foreach { record => records.foreach { record =>
builder.appendUncheckedWithOffset(offset, record) builder.appendUncheckedWithOffset(offset, record)
offset += 1 offset += 1

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.TimestampType;
@ -50,21 +51,23 @@ import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE;
public class RecordBatchIterationBenchmark { public class RecordBatchIterationBenchmark {
private final Random random = new Random(0); private final Random random = new Random(0);
private final int batchCount = 5000; private final int batchCount = 100;
private final int maxBatchSize = 10;
public enum Bytes { public enum Bytes {
RANDOM, ONES RANDOM, ONES
} }
@Param(value = {"LZ4", "SNAPPY", "NONE"}) @Param(value = {"10", "50", "200", "500"})
private int maxBatchSize = 200;
@Param(value = {"LZ4", "SNAPPY", "GZIP", "ZSTD", "NONE"})
private CompressionType compressionType = CompressionType.NONE; private CompressionType compressionType = CompressionType.NONE;
@Param(value = {"1", "2"}) @Param(value = {"1", "2"})
private byte messageVersion = CURRENT_MAGIC_VALUE; private byte messageVersion = CURRENT_MAGIC_VALUE;
@Param(value = {"100", "1000", "10000", "100000"}) @Param(value = {"100", "1000", "10000", "100000"})
private int messageSize = 100; private int messageSize = 1000;
@Param(value = {"RANDOM", "ONES"}) @Param(value = {"RANDOM", "ONES"})
private Bytes bytes = Bytes.RANDOM; private Bytes bytes = Bytes.RANDOM;
@ -130,6 +133,7 @@ public class RecordBatchIterationBenchmark {
} }
@OperationsPerInvocation(value = batchCount) @OperationsPerInvocation(value = batchCount)
@Fork(jvmArgsAppend = "-Xmx8g")
@Benchmark @Benchmark
public void measureStreamingIteratorForVariableBatchSize(Blackhole bh) throws IOException { public void measureStreamingIteratorForVariableBatchSize(Blackhole bh) throws IOException {
for (int i = 0; i < batchCount; ++i) { for (int i = 0; i < batchCount; ++i) {
@ -142,4 +146,17 @@ public class RecordBatchIterationBenchmark {
} }
} }
@OperationsPerInvocation(value = batchCount)
@Fork(jvmArgsAppend = "-Xmx8g")
@Benchmark
public void measureSkipIteratorForVariableBatchSize(Blackhole bh) throws IOException {
for (int i = 0; i < batchCount; ++i) {
for (MutableRecordBatch batch : MemoryRecords.readableRecords(batchBuffers[i].duplicate()).batches()) {
try (CloseableIterator<Record> iterator = batch.skipKeyValueIterator(bufferSupplier)) {
while (iterator.hasNext())
bh.consume(iterator.next());
}
}
}
}
} }