diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 182c92c9001..ea4d9d0fe60 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -54,6 +54,7 @@ import org.apache.kafka.common.requests.TxnOffsetCommitRequest; import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset; import org.apache.kafka.common.requests.TxnOffsetCommitResponse; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.PrimitiveRef; import org.slf4j.Logger; import java.util.ArrayList; @@ -68,7 +69,6 @@ import java.util.PriorityQueue; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; @@ -655,15 +655,15 @@ public class TransactionManager { } private void startSequencesAtBeginning(TopicPartition topicPartition) { - final AtomicInteger sequence = new AtomicInteger(0); + final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0); topicPartitionBookkeeper.getPartition(topicPartition).resetSequenceNumbers(inFlightBatch -> { log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", - inFlightBatch.baseSequence(), inFlightBatch.topicPartition, sequence.get()); + inFlightBatch.baseSequence(), inFlightBatch.topicPartition, sequence.value); inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), - inFlightBatch.producerEpoch()), sequence.get(), inFlightBatch.isTransactional()); - sequence.getAndAdd(inFlightBatch.recordCount); + inFlightBatch.producerEpoch()), sequence.value, inFlightBatch.isTransactional()); + sequence.value += inFlightBatch.recordCount; }); - setNextSequence(topicPartition, sequence.get()); + setNextSequence(topicPartition, sequence.value); topicPartitionBookkeeper.getPartition(topicPartition).lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER; } diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 8dd405c0596..cf38d312306 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -227,7 +227,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl return iterator(BufferSupplier.NO_CACHING); } - private CloseableIterator iterator(BufferSupplier bufferSupplier) { + CloseableIterator iterator(BufferSupplier bufferSupplier) { if (isCompressed()) return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier); @@ -503,6 +503,16 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl ByteUtils.writeUnsignedInt(buffer, LOG_OVERHEAD + LegacyRecord.CRC_OFFSET, crc); } + /** + * LegacyRecordBatch does not implement this iterator and would hence fallback to the normal iterator. + * + * @return An iterator over the records contained within this batch + */ + @Override + public CloseableIterator skipKeyValueIterator(BufferSupplier bufferSupplier) { + return CloseableIterator.wrap(iterator(bufferSupplier)); + } + @Override public void writeTo(ByteBufferOutputStream outputStream) { outputStream.write(buffer.duplicate()); diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java index 78ad05046d2..d104fcde68d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.common.record; -abstract class AbstractRecordBatch implements RecordBatch { +abstract class AbstractRecordBatch implements RecordBatch { @Override public boolean hasProducerId() { return RecordBatch.NO_PRODUCER_ID < producerId(); diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 411e647d50b..09bd2e501f7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -43,13 +43,13 @@ public abstract class AbstractRecords implements Records { return true; } - public boolean firstBatchHasCompatibleMagic(byte magic) { + public RecordBatch firstBatch() { Iterator iterator = batches().iterator(); if (!iterator.hasNext()) - return true; + return null; - return iterator.next().magic() <= magic; + return iterator.next(); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java index fbe170fbf0c..bf1320ea30c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.Checksums; import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.PrimitiveRef.IntRef; import org.apache.kafka.common.utils.Utils; import java.io.DataInput; @@ -78,14 +80,14 @@ public class DefaultRecord implements Record { private final ByteBuffer value; private final Header[] headers; - private DefaultRecord(int sizeInBytes, - byte attributes, - long offset, - long timestamp, - int sequence, - ByteBuffer key, - ByteBuffer value, - Header[] headers) { + DefaultRecord(int sizeInBytes, + byte attributes, + long offset, + long timestamp, + int sequence, + ByteBuffer key, + ByteBuffer value, + Header[] headers) { this.sizeInBytes = sizeInBytes; this.attributes = attributes; this.offset = offset; @@ -370,6 +372,161 @@ public class DefaultRecord implements Record { } } + public static PartialDefaultRecord readPartiallyFrom(DataInput input, + byte[] skipArray, + long baseOffset, + long baseTimestamp, + int baseSequence, + Long logAppendTime) throws IOException { + int sizeOfBodyInBytes = ByteUtils.readVarint(input); + int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; + + return readPartiallyFrom(input, skipArray, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp, + baseSequence, logAppendTime); + } + + private static PartialDefaultRecord readPartiallyFrom(DataInput input, + byte[] skipArray, + int sizeInBytes, + int sizeOfBodyInBytes, + long baseOffset, + long baseTimestamp, + int baseSequence, + Long logAppendTime) throws IOException { + ByteBuffer skipBuffer = ByteBuffer.wrap(skipArray); + // set its limit to 0 to indicate no bytes readable yet + skipBuffer.limit(0); + + try { + // reading the attributes / timestamp / offset and key-size does not require + // any byte array allocation and therefore we can just read them straight-forwardly + IntRef bytesRemaining = PrimitiveRef.ofInt(sizeOfBodyInBytes); + + byte attributes = readByte(skipBuffer, input, bytesRemaining); + long timestampDelta = readVarLong(skipBuffer, input, bytesRemaining); + long timestamp = baseTimestamp + timestampDelta; + if (logAppendTime != null) + timestamp = logAppendTime; + + int offsetDelta = readVarInt(skipBuffer, input, bytesRemaining); + long offset = baseOffset + offsetDelta; + int sequence = baseSequence >= 0 ? + DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) : + RecordBatch.NO_SEQUENCE; + + // first skip key + int keySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + + // then skip value + int valueSize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + + // then skip header + int numHeaders = readVarInt(skipBuffer, input, bytesRemaining); + if (numHeaders < 0) + throw new InvalidRecordException("Found invalid number of record headers " + numHeaders); + for (int i = 0; i < numHeaders; i++) { + int headerKeySize = skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + if (headerKeySize < 0) + throw new InvalidRecordException("Invalid negative header key size " + headerKeySize); + + // headerValueSize + skipLengthDelimitedField(skipBuffer, input, bytesRemaining); + } + + if (bytesRemaining.value > 0 || skipBuffer.remaining() > 0) + throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes + + " bytes in record payload, but there are still bytes remaining"); + + return new PartialDefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, keySize, valueSize); + } catch (BufferUnderflowException | IllegalArgumentException e) { + throw new InvalidRecordException("Found invalid record structure", e); + } + } + + private static byte readByte(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { + if (buffer.remaining() < 1 && bytesRemaining.value > 0) { + readMore(buffer, input, bytesRemaining); + } + + return buffer.get(); + } + + private static long readVarLong(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { + if (buffer.remaining() < 10 && bytesRemaining.value > 0) { + readMore(buffer, input, bytesRemaining); + } + + return ByteUtils.readVarlong(buffer); + } + + private static int readVarInt(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { + if (buffer.remaining() < 5 && bytesRemaining.value > 0) { + readMore(buffer, input, bytesRemaining); + } + + return ByteUtils.readVarint(buffer); + } + + private static int skipLengthDelimitedField(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { + boolean needMore = false; + int sizeInBytes = -1; + int bytesToSkip = -1; + + while (true) { + if (needMore) { + readMore(buffer, input, bytesRemaining); + needMore = false; + } + + if (bytesToSkip < 0) { + if (buffer.remaining() < 5 && bytesRemaining.value > 0) { + needMore = true; + } else { + sizeInBytes = ByteUtils.readVarint(buffer); + if (sizeInBytes <= 0) + return sizeInBytes; + else + bytesToSkip = sizeInBytes; + + } + } else { + if (bytesToSkip > buffer.remaining()) { + bytesToSkip -= buffer.remaining(); + buffer.position(buffer.limit()); + needMore = true; + } else { + buffer.position(buffer.position() + bytesToSkip); + return sizeInBytes; + } + } + } + } + + private static void readMore(ByteBuffer buffer, DataInput input, IntRef bytesRemaining) throws IOException { + if (bytesRemaining.value > 0) { + byte[] array = buffer.array(); + + // first copy the remaining bytes to the beginning of the array; + // at most 4 bytes would be shifted here + int stepsToLeftShift = buffer.position(); + int bytesToLeftShift = buffer.remaining(); + for (int i = 0; i < bytesToLeftShift; i++) { + array[i] = array[i + stepsToLeftShift]; + } + + // then try to read more bytes to the remaining of the array + int bytesRead = Math.min(bytesRemaining.value, array.length - bytesToLeftShift); + input.readFully(array, bytesToLeftShift, bytesRead); + buffer.rewind(); + // only those many bytes are readable + buffer.limit(bytesToLeftShift + bytesRead); + + bytesRemaining.value -= bytesRead; + } else { + throw new InvalidRecordException("Invalid record size: expected to read more bytes in record payload"); + } + } + private static Header[] readHeaders(ByteBuffer buffer, int numHeaders) { Header[] headers = new Header[numHeaders]; for (int i = 0; i < numHeaders; i++) { diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index ac37fae6be5..5b53f19dd07 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -128,6 +128,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe private static final int CONTROL_FLAG_MASK = 0x20; private static final byte TIMESTAMP_TYPE_MASK = 0x08; + private static final int MAX_SKIP_BUFFER_SIZE = 2048; + private final ByteBuffer buffer; DefaultRecordBatch(ByteBuffer buffer) { @@ -251,42 +253,30 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET); } - private CloseableIterator compressedIterator(BufferSupplier bufferSupplier) { + private CloseableIterator compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) { final ByteBuffer buffer = this.buffer.duplicate(); buffer.position(RECORDS_OFFSET); final DataInputStream inputStream = new DataInputStream(compressionType().wrapForInput(buffer, magic(), - bufferSupplier)); + bufferSupplier)); - return new RecordIterator() { - @Override - protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) { - try { + if (skipKeyValue) { + // this buffer is used to skip length delimited fields like key, value, headers + byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE]; + + return new StreamRecordIterator(inputStream) { + @Override + protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException { + return DefaultRecord.readPartiallyFrom(inputStream, skipArray, baseOffset, firstTimestamp, baseSequence, logAppendTime); + } + }; + } else { + return new StreamRecordIterator(inputStream) { + @Override + protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException { return DefaultRecord.readFrom(inputStream, baseOffset, firstTimestamp, baseSequence, logAppendTime); - } catch (EOFException e) { - throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); - } catch (IOException e) { - throw new KafkaException("Failed to decompress record stream", e); } - } - - @Override - protected boolean ensureNoneRemaining() { - try { - return inputStream.read() == -1; - } catch (IOException e) { - throw new KafkaException("Error checking for remaining bytes after reading batch", e); - } - } - - @Override - public void close() { - try { - inputStream.close(); - } catch (IOException e) { - throw new KafkaException("Failed to close record stream", e); - } - } - }; + }; + } } private CloseableIterator uncompressedIterator() { @@ -321,7 +311,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe // for a normal iterator, we cannot ensure that the underlying compression stream is closed, // so we decompress the full record set here. Use cases which call for a lower memory footprint // can use `streamingIterator` at the cost of additional complexity - try (CloseableIterator iterator = compressedIterator(BufferSupplier.NO_CACHING)) { + try (CloseableIterator iterator = compressedIterator(BufferSupplier.NO_CACHING, false)) { List records = new ArrayList<>(count()); while (iterator.hasNext()) records.add(iterator.next()); @@ -329,10 +319,29 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe } } + @Override + public CloseableIterator skipKeyValueIterator(BufferSupplier bufferSupplier) { + if (count() == 0) { + return CloseableIterator.wrap(Collections.emptyIterator()); + } + + /* + * For uncompressed iterator, it is actually not worth skipping key / value / headers at all since + * its ByteBufferInputStream's skip() function is less efficient compared with just reading it actually + * as it will allocate new byte array. + */ + if (!isCompressed()) + return uncompressedIterator(); + + // we define this to be a closable iterator so that caller (i.e. the log validator) needs to close it + // while we can save memory footprint of not decompressing the full record set ahead of time + return compressedIterator(bufferSupplier, true); + } + @Override public CloseableIterator streamingIterator(BufferSupplier bufferSupplier) { if (isCompressed()) - return compressedIterator(bufferSupplier); + return compressedIterator(bufferSupplier, false); else return uncompressedIterator(); } @@ -543,7 +552,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe private final int numRecords; private int readRecords = 0; - public RecordIterator() { + RecordIterator() { this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null; this.baseOffset = baseOffset(); this.firstTimestamp = firstTimestamp(); @@ -588,6 +597,46 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe } + private abstract class StreamRecordIterator extends RecordIterator { + private final DataInputStream inputStream; + + StreamRecordIterator(DataInputStream inputStream) { + super(); + this.inputStream = inputStream; + } + + abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException; + + @Override + protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) { + try { + return doReadRecord(baseOffset, firstTimestamp, baseSequence, logAppendTime); + } catch (EOFException e) { + throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); + } catch (IOException e) { + throw new KafkaException("Failed to decompress record stream", e); + } + } + + @Override + protected boolean ensureNoneRemaining() { + try { + return inputStream.read() == -1; + } catch (IOException e) { + throw new KafkaException("Error checking for remaining bytes after reading batch", e); + } + } + + @Override + public void close() { + try { + inputStream.close(); + } catch (IOException e) { + throw new KafkaException("Failed to close record stream", e); + } + } + } + static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch { DefaultFileChannelRecordBatch(long offset, diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 7512c8273da..054fb861998 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -418,7 +418,7 @@ public class MemoryRecordsBuilder implements AutoCloseable { appendDefaultRecord(offset, timestamp, key, value, headers); return null; } else { - return appendLegacyRecord(offset, timestamp, key, value); + return appendLegacyRecord(offset, timestamp, key, value, magic); } } catch (IOException e) { throw new KafkaException("I/O exception when writing to the append stream, closing", e); @@ -632,7 +632,7 @@ public class MemoryRecordsBuilder implements AutoCloseable { recordWritten(offset, timestamp, sizeInBytes); } - private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException { + private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, byte magic) throws IOException { ensureOpenForRecordAppend(); if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME) timestamp = logAppendTime; diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java index c13bb5a2880..8c0dc2363e9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.CloseableIterator; /** * A mutable record batch is one that can be modified in place (without copying). This is used by the broker @@ -55,4 +56,12 @@ public interface MutableRecordBatch extends RecordBatch { */ void writeTo(ByteBufferOutputStream outputStream); + /** + * Return an iterator which skips parsing key, value and headers from the record stream, and therefore the resulted + * {@code org.apache.kafka.common.record.Record}'s key and value fields would be empty. This iterator is used + * when the read record's key and value are not needed and hence can save some byte buffer allocating / GC overhead. + * + * @return The closeable iterator + */ + CloseableIterator skipKeyValueIterator(BufferSupplier bufferSupplier); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/PartialDefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/PartialDefaultRecord.java new file mode 100644 index 00000000000..67ca1abafbc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/PartialDefaultRecord.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.header.Header; + +import java.nio.ByteBuffer; + +public class PartialDefaultRecord extends DefaultRecord { + + private final int keySize; + private final int valueSize; + + PartialDefaultRecord(int sizeInBytes, + byte attributes, + long offset, + long timestamp, + int sequence, + int keySize, + int valueSize) { + super(sizeInBytes, attributes, offset, timestamp, sequence, null, null, null); + + this.keySize = keySize; + this.valueSize = valueSize; + } + + @Override + public boolean equals(Object o) { + return super.equals(o) && + this.keySize == ((PartialDefaultRecord) o).keySize && + this.valueSize == ((PartialDefaultRecord) o).valueSize; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + keySize; + result = 31 * result + valueSize; + return result; + } + + @Override + public String toString() { + return String.format("PartialDefaultRecord(offset=%d, timestamp=%d, key=%d bytes, value=%d bytes)", + offset(), + timestamp(), + keySize, + valueSize); + } + + @Override + public int keySize() { + return keySize; + } + + @Override + public boolean hasKey() { + return keySize >= 0; + } + + @Override + public ByteBuffer key() { + throw new UnsupportedOperationException("key is skipped in PartialDefaultRecord"); + } + + @Override + public int valueSize() { + return valueSize; + } + + @Override + public boolean hasValue() { + return valueSize >= 0; + } + + @Override + public ByteBuffer value() { + throw new UnsupportedOperationException("value is skipped in PartialDefaultRecord"); + } + + @Override + public Header[] headers() { + throw new UnsupportedOperationException("headers is skipped in PartialDefaultRecord"); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java index 38fba8ec9d4..50b06369b47 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/CloseableIterator.java @@ -27,4 +27,26 @@ import java.util.Iterator; */ public interface CloseableIterator extends Iterator, Closeable { void close(); + + static CloseableIterator wrap(Iterator inner) { + return new CloseableIterator() { + @Override + public void close() {} + + @Override + public boolean hasNext() { + return inner.hasNext(); + } + + @Override + public R next() { + return inner.next(); + } + + @Override + public void remove() { + inner.remove(); + } + }; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/PrimitiveRef.java b/clients/src/main/java/org/apache/kafka/common/utils/PrimitiveRef.java new file mode 100644 index 00000000000..e1bbfe37316 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/PrimitiveRef.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +/** + * Primitive reference used to pass primitive typed values as parameter-by-reference. + * + * This is cheaper than using Atomic references. + */ +public class PrimitiveRef { + public static IntRef ofInt(int value) { + return new IntRef(value); + } + + public static class IntRef { + public int value; + + IntRef(int value) { + this.value = value; + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java index ab8cbb7ee2c..34e46adee12 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java @@ -375,6 +375,32 @@ public class DefaultRecordBatchTest { } } + @Test + public void testSkipKeyValueIteratorCorrectness() { + Header[] headers = {new RecordHeader("k1", "v1".getBytes()), new RecordHeader("k2", "v2".getBytes())}; + + MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L, + CompressionType.LZ4, TimestampType.CREATE_TIME, + new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), + new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), + new SimpleRecord(3L, "c".getBytes(), "3".getBytes()), + new SimpleRecord(1000L, "abc".getBytes(), "0".getBytes()), + new SimpleRecord(9999L, "abc".getBytes(), "0".getBytes(), headers) + ); + DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer()); + try (CloseableIterator streamingIterator = batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)) { + assertEquals(Arrays.asList( + new PartialDefaultRecord(9, (byte) 0, 0L, 1L, -1, 1, 1), + new PartialDefaultRecord(9, (byte) 0, 1L, 2L, -1, 1, 1), + new PartialDefaultRecord(9, (byte) 0, 2L, 3L, -1, 1, 1), + new PartialDefaultRecord(12, (byte) 0, 3L, 1000L, -1, 3, 1), + new PartialDefaultRecord(25, (byte) 0, 4L, 9999L, -1, 3, 1) + ), + Utils.toList(streamingIterator) + ); + } + } + @Test public void testIncrementSequence() { assertEquals(10, DefaultRecordBatch.incrementSequence(5, 5)); diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java index 3ff73c9c67b..198f9945d3f 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java @@ -18,10 +18,13 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; +import org.junit.Before; import org.junit.Test; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -32,6 +35,13 @@ import static org.junit.Assert.assertNotNull; public class DefaultRecordTest { + private byte[] skipArray; + + @Before + public void setUp() { + skipArray = new byte[64]; + } + @Test public void testBasicSerde() throws IOException { Header[] headers = new Header[] { @@ -152,6 +162,27 @@ public class DefaultRecordTest { DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); } + @Test(expected = InvalidRecordException.class) + public void testInvalidKeySizePartial() throws IOException { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + int keySize = 105; // use a key size larger than the full message + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(keySize, buf); + buf.position(buf.limit()); + + buf.flip(); + DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf)); + DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + @Test(expected = InvalidRecordException.class) public void testInvalidValueSize() throws IOException { byte attributes = 0; @@ -173,6 +204,210 @@ public class DefaultRecordTest { DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); } + @Test(expected = InvalidRecordException.class) + public void testInvalidValueSizePartial() throws IOException { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + int valueSize = 105; // use a value size larger than the full message + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(-1, buf); // null key + ByteUtils.writeVarint(valueSize, buf); + buf.position(buf.limit()); + + buf.flip(); + DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf)); + DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = InvalidRecordException.class) + public void testInvalidNumHeaders() throws IOException { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(-1, buf); // null key + ByteUtils.writeVarint(-1, buf); // null value + ByteUtils.writeVarint(-1, buf); // -1 num.headers, not allowed + buf.position(buf.limit()); + + buf.flip(); + DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = InvalidRecordException.class) + public void testInvalidNumHeadersPartial() throws IOException { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(-1, buf); // null key + ByteUtils.writeVarint(-1, buf); // null value + ByteUtils.writeVarint(-1, buf); // -1 num.headers, not allowed + buf.position(buf.limit()); + + buf.flip(); + DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf)); + DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = StringIndexOutOfBoundsException.class) + public void testInvalidHeaderKey() { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(-1, buf); // null key + ByteUtils.writeVarint(-1, buf); // null value + ByteUtils.writeVarint(1, buf); + ByteUtils.writeVarint(105, buf); // header key too long + buf.position(buf.limit()); + + buf.flip(); + DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = InvalidRecordException.class) + public void testInvalidHeaderKeyPartial() throws IOException { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(-1, buf); // null key + ByteUtils.writeVarint(-1, buf); // null value + ByteUtils.writeVarint(1, buf); + ByteUtils.writeVarint(105, buf); // header key too long + buf.position(buf.limit()); + + buf.flip(); + DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf)); + DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = InvalidRecordException.class) + public void testNullHeaderKey() { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(-1, buf); // null key + ByteUtils.writeVarint(-1, buf); // null value + ByteUtils.writeVarint(1, buf); + ByteUtils.writeVarint(-1, buf); // null header key not allowed + buf.position(buf.limit()); + + buf.flip(); + DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = InvalidRecordException.class) + public void testNullHeaderKeyPartial() throws IOException { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(-1, buf); // null key + ByteUtils.writeVarint(-1, buf); // null value + ByteUtils.writeVarint(1, buf); + ByteUtils.writeVarint(-1, buf); // null header key not allowed + buf.position(buf.limit()); + + buf.flip(); + DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf)); + DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = InvalidRecordException.class) + public void testInvalidHeaderValue() { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(-1, buf); // null key + ByteUtils.writeVarint(-1, buf); // null value + ByteUtils.writeVarint(1, buf); + ByteUtils.writeVarint(1, buf); + buf.put((byte) 1); + ByteUtils.writeVarint(105, buf); // header value too long + buf.position(buf.limit()); + + buf.flip(); + DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = InvalidRecordException.class) + public void testInvalidHeaderValuePartial() throws IOException { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(-1, buf); // null key + ByteUtils.writeVarint(-1, buf); // null value + ByteUtils.writeVarint(1, buf); + ByteUtils.writeVarint(1, buf); + buf.put((byte) 1); + ByteUtils.writeVarint(105, buf); // header value too long + buf.position(buf.limit()); + + buf.flip(); + DataInputStream inputStream = new DataInputStream(new ByteBufferInputStream(buf)); + DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + @Test(expected = InvalidRecordException.class) public void testUnderflowReadingTimestamp() { byte attributes = 0; diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index d10eed8af7d..d7088a675e9 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -23,7 +23,7 @@ import kafka.common.LongRef import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.utils.Logging import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} -import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} +import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType, BufferSupplier} import org.apache.kafka.common.utils.Time import scala.collection.mutable @@ -74,21 +74,30 @@ private[kafka] object LogValidator extends Logging { } } - private[kafka] def validateOneBatchRecords(records: MemoryRecords) { + private[kafka] def getFirstBatchAndMaybeValidateNoMoreBatches(records: MemoryRecords, sourceCodec: CompressionCodec): RecordBatch = { val batchIterator = records.batches.iterator if (!batchIterator.hasNext) { - throw new InvalidRecordException("Compressed outer record has no batches at all") + throw new InvalidRecordException("Record batch has no batches at all") } - batchIterator.next() + val batch = batchIterator.next() - if (batchIterator.hasNext) { - throw new InvalidRecordException("Compressed outer record has more than one batch") + // if the format is v2 and beyond, or if the messages are compressed, we should check there's only one batch. + if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || sourceCodec != NoCompressionCodec) { + if (batchIterator.hasNext) { + throw new InvalidRecordException("Compressed outer record has more than one batch") + } } + + batch } - private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = { + private def validateBatch(firstBatch: RecordBatch, batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = { + // batch magic byte should have the same magic as the first batch + if (firstBatch.magic() != batch.magic()) + throw new InvalidRecordException(s"Batch magic ${batch.magic()} is not the same as the first batch'es magic byte ${firstBatch.magic()}") + if (isFromClient) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { val countFromOffsets = batch.lastOffset - batch.baseOffset + 1 @@ -123,7 +132,7 @@ private[kafka] object LogValidator extends Logging { private def validateRecord(batch: RecordBatch, record: Record, now: Long, timestampType: TimestampType, timestampDiffMaxMs: Long, compactedTopic: Boolean): Unit = { if (!record.hasMagic(batch.magic)) - throw new InvalidRecordException(s"Log record magic does not match outer magic ${batch.magic}") + throw new InvalidRecordException(s"Log record $record's magic does not match outer magic ${batch.magic}") // verify the record-level CRC only if this is one of the deep entries of a compressed message // set for magic v0 and v1. For non-compressed messages, there is no inner record for magic v0 and v1, @@ -159,8 +168,10 @@ private[kafka] object LogValidator extends Logging { val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType, offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch) + val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) + for (batch <- records.batches.asScala) { - validateBatch(batch, isFromClient, toMagicValue) + validateBatch(firstBatch, batch, isFromClient, toMagicValue) for (record <- batch.asScala) { validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) @@ -194,13 +205,10 @@ private[kafka] object LogValidator extends Logging { var offsetOfMaxTimestamp = -1L val initialOffset = offsetCounter.value - if (!records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) { - // for v2 and beyond, we should check there's only one batch. - validateOneBatchRecords(records) - } + val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) for (batch <- records.batches.asScala) { - validateBatch(batch, isFromClient, magic) + validateBatch(firstBatch, batch, isFromClient, magic) var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L @@ -251,9 +259,8 @@ private[kafka] object LogValidator extends Logging { /** * We cannot do in place assignment in one of the following situations: * 1. Source and target compression codec are different - * 2. When magic value to use is 0 because offsets need to be overwritten - * 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten. - * 4. Message format conversion is needed. + * 2. When the target magic is not equal to batches' magic, meaning format conversion is needed. + * 3. When the target magic is equal to V0, meaning absolute offsets need to be re-assigned. */ def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords, offsetCounter: LongRef, @@ -269,8 +276,12 @@ private[kafka] object LogValidator extends Logging { isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = { - // No in place assignment situation 1 and 2 - var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0 + if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0) + throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + + "are not allowed to use ZStandard compression") + + // No in place assignment situation 1 + var inPlaceAssignment = sourceCodec == targetCodec var maxTimestamp = RecordBatch.NO_TIMESTAMP val expectedInnerOffset = new LongRef(0) @@ -281,43 +292,51 @@ private[kafka] object LogValidator extends Logging { // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException // One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually // a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records - if (sourceCodec != NoCompressionCodec || !records.firstBatchHasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) { - validateOneBatchRecords(records) - } + val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, sourceCodec) + + // No in place assignment situation 2 and 3: we only need to check for the first batch because: + // 1. For most cases (compressed records, v2, for example), there's only one batch anyways. + // 2. For cases that there may be multiple batches, all batches' magic should be the same. + if (firstBatch.magic != toMagic || toMagic == RecordBatch.MAGIC_VALUE_V0) + inPlaceAssignment = false + + // Do not compress control records unless they are written compressed + if (sourceCodec == NoCompressionCodec && firstBatch.isControlBatch) + inPlaceAssignment = true val batches = records.batches.asScala - for (batch <- batches) { - validateBatch(batch, isFromClient, toMagic) + validateBatch(firstBatch, batch, isFromClient, toMagic) uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType()) - // Do not compress control records unless they are written compressed - if (sourceCodec == NoCompressionCodec && batch.isControlBatch) - inPlaceAssignment = true + // if we are on version 2 and beyond, and we know we are going for in place assignment, + // then we can optimize the iterator to skip key / value / headers since they would not be used at all + val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2) + batch.skipKeyValueIterator(BufferSupplier.NO_CACHING) + else + batch.streamingIterator(BufferSupplier.NO_CACHING) - for (record <- batch.asScala) { - if (sourceCodec != NoCompressionCodec && record.isCompressed) - throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + - s"compression attribute set: $record") - if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0) - throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression") - validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) + try { + for (record <- batch.asScala) { + if (sourceCodec != NoCompressionCodec && record.isCompressed) + throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + + s"compression attribute set: $record") + validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) - uncompressedSizeInBytes += record.sizeInBytes() - if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { - // Check if we need to overwrite offset - // No in place assignment situation 3 - if (record.offset != expectedInnerOffset.getAndIncrement()) - inPlaceAssignment = false - if (record.timestamp > maxTimestamp) - maxTimestamp = record.timestamp + uncompressedSizeInBytes += record.sizeInBytes() + if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { + // inner records offset should always be continuous + val expectedOffset = expectedInnerOffset.getAndIncrement() + if (record.offset != expectedOffset) + throw new InvalidRecordException(s"Inner record $record inside the compressed record batch does not have incremental offsets, expected offset is $expectedOffset") + if (record.timestamp > maxTimestamp) + maxTimestamp = record.timestamp + } + + validatedRecords += record } - - // No in place assignment situation 4 - if (!record.hasMagic(toMagic)) - inPlaceAssignment = false - - validatedRecords += record + } finally { + recordsIterator.close() } } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 324314f151e..26c1e5ff026 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -56,6 +56,12 @@ class LogValidatorTest { checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE, CompressionType.GZIP) } + @Test + def testMisMatchMagic(): Unit = { + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP) + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP) + } + private def checkOnlyOneBatch(magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType) { assertThrows[InvalidRecordException] { validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType) @@ -66,6 +72,12 @@ class LogValidatorTest { validateMessages(createTwoBatchedRecords(magic, 0L, sourceCompressionType), magic, sourceCompressionType, targetCompressionType) } + private def checkMismatchMagic(batchMagic: Byte, recordMagic: Byte, compressionType: CompressionType): Unit = { + assertThrows[InvalidRecordException] { + validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compressionType), batchMagic, compressionType, compressionType) + } + } + private def validateMessages(records: MemoryRecords, magic: Byte, sourceCompressionType: CompressionType, targetCompressionType: CompressionType): Unit = { LogValidator.validateMessagesAndAssignOffsets(records, new LongRef(0L), @@ -1094,25 +1106,6 @@ class LogValidatorTest { interBrokerProtocolVersion = ApiVersion.latestVersion).validatedRecords, offset) } - @Test(expected = classOf[InvalidRecordException]) - def testInvalidInnerMagicVersion(): Unit = { - val offset = 1234567 - val records = recordsWithInvalidInnerMagic(offset) - LogValidator.validateMessagesAndAssignOffsets(records, - offsetCounter = new LongRef(offset), - time = time, - now = System.currentTimeMillis(), - sourceCodec = SnappyCompressionCodec, - targetCodec = SnappyCompressionCodec, - compactedTopic = false, - magic = RecordBatch.MAGIC_VALUE_V1, - timestampType = TimestampType.CREATE_TIME, - timestampDiffMaxMs = 5000L, - partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, - isFromClient = true, - interBrokerProtocolVersion = ApiVersion.latestVersion) - } - @Test(expected = classOf[InvalidRecordException]) def testCompressedBatchWithoutRecordsNotAllowed(): Unit = { testBatchWithoutRecordsNotAllowed(DefaultCompressionCodec, DefaultCompressionCodec) @@ -1184,9 +1177,9 @@ class LogValidatorTest { builder.build() } - def createTwoBatchedRecords(magicValue: Byte, - timestamp: Long = RecordBatch.NO_TIMESTAMP, - codec: CompressionType): MemoryRecords = { + private def createTwoBatchedRecords(magicValue: Byte, + timestamp: Long = RecordBatch.NO_TIMESTAMP, + codec: CompressionType): MemoryRecords = { val buf = ByteBuffer.allocate(2048) var builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) builder.append(10L, "1".getBytes(), "a".getBytes()) @@ -1200,6 +1193,16 @@ class LogValidatorTest { MemoryRecords.readableRecords(buf.slice()) } + private def createDiscontinuousOffsetRecords(magicValue: Byte, + codec: CompressionType): MemoryRecords = { + val buf = ByteBuffer.allocate(512) + val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L) + builder.appendWithOffset(0, RecordBatch.NO_TIMESTAMP, null, "hello".getBytes) + builder.appendWithOffset(2, RecordBatch.NO_TIMESTAMP, null, "there".getBytes) + builder.appendWithOffset(3, RecordBatch.NO_TIMESTAMP, null, "beautiful".getBytes) + builder.build() + } + /* check that offsets are assigned consecutively from the given base offset */ def checkOffsets(records: MemoryRecords, baseOffset: Long) { assertTrue("Message set should not be empty", records.records.asScala.nonEmpty) @@ -1210,18 +1213,20 @@ class LogValidatorTest { } } - private def recordsWithInvalidInnerMagic(initialOffset: Long): MemoryRecords = { + private def recordsWithInvalidInnerMagic(batchMagicValue: Byte, + recordMagicValue: Byte, + codec: CompressionType): MemoryRecords = { val records = (0 until 20).map(id => - LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, + LegacyRecord.create(recordMagicValue, RecordBatch.NO_TIMESTAMP, id.toString.getBytes, id.toString.getBytes)) val buffer = ByteBuffer.allocate(math.min(math.max(records.map(_.sizeInBytes()).sum / 2, 1024), 1 << 16)) - val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.GZIP, + val builder = MemoryRecords.builder(buffer, batchMagicValue, codec, TimestampType.CREATE_TIME, 0L) - var offset = initialOffset + var offset = 1234567 records.foreach { record => builder.appendUncheckedWithOffset(offset, record) offset += 1 diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java index 75fec048174..73552c2131e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.record.BufferSupplier; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; @@ -50,21 +51,23 @@ import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE; public class RecordBatchIterationBenchmark { private final Random random = new Random(0); - private final int batchCount = 5000; - private final int maxBatchSize = 10; + private final int batchCount = 100; public enum Bytes { RANDOM, ONES } - @Param(value = {"LZ4", "SNAPPY", "NONE"}) + @Param(value = {"10", "50", "200", "500"}) + private int maxBatchSize = 200; + + @Param(value = {"LZ4", "SNAPPY", "GZIP", "ZSTD", "NONE"}) private CompressionType compressionType = CompressionType.NONE; @Param(value = {"1", "2"}) private byte messageVersion = CURRENT_MAGIC_VALUE; @Param(value = {"100", "1000", "10000", "100000"}) - private int messageSize = 100; + private int messageSize = 1000; @Param(value = {"RANDOM", "ONES"}) private Bytes bytes = Bytes.RANDOM; @@ -130,6 +133,7 @@ public class RecordBatchIterationBenchmark { } @OperationsPerInvocation(value = batchCount) + @Fork(jvmArgsAppend = "-Xmx8g") @Benchmark public void measureStreamingIteratorForVariableBatchSize(Blackhole bh) throws IOException { for (int i = 0; i < batchCount; ++i) { @@ -142,4 +146,17 @@ public class RecordBatchIterationBenchmark { } } + @OperationsPerInvocation(value = batchCount) + @Fork(jvmArgsAppend = "-Xmx8g") + @Benchmark + public void measureSkipIteratorForVariableBatchSize(Blackhole bh) throws IOException { + for (int i = 0; i < batchCount; ++i) { + for (MutableRecordBatch batch : MemoryRecords.readableRecords(batchBuffers[i].duplicate()).batches()) { + try (CloseableIterator iterator = batch.skipKeyValueIterator(bufferSupplier)) { + while (iterator.hasNext()) + bh.consume(iterator.next()); + } + } + } + } }