diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7f51979de0d..fdedef8a7b5 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -202,6 +202,7 @@
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 440ca6ebb18..5287b4e5712 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -46,6 +46,7 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.Record;
@@ -105,6 +106,7 @@ public class Fetcher implements SubscriptionState.Listener, Closeable {
private final FetchManagerMetrics sensors;
private final SubscriptionState subscriptions;
private final ConcurrentLinkedQueue completedFetches;
+ private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
private final ExtendedDeserializer keyDeserializer;
private final ExtendedDeserializer valueDeserializer;
@@ -1039,7 +1041,7 @@ public class Fetcher implements SubscriptionState.Listener, Closeable {
}
}
- records = currentBatch.streamingIterator();
+ records = currentBatch.streamingIterator(decompressionBufferSupplier);
}
Record record = records.next();
@@ -1326,6 +1328,7 @@ public class Fetcher implements SubscriptionState.Listener, Closeable {
if (nextInLineRecords != null)
nextInLineRecords.drain();
nextInLineExceptionMetadata = null;
+ decompressionBufferSupplier.close();
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index be696864e7d..e4938be9fac 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -20,16 +20,14 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.AbstractIterator;
-import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.NoSuchElementException;
@@ -224,8 +222,12 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
*/
@Override
public CloseableIterator iterator() {
+ return iterator(BufferSupplier.NO_CACHING);
+ }
+
+ private CloseableIterator iterator(BufferSupplier bufferSupplier) {
if (isCompressed())
- return new DeepRecordsIterator(this, false, Integer.MAX_VALUE);
+ return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier);
return new CloseableIterator() {
private boolean hasNext = true;
@@ -254,9 +256,9 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
}
@Override
- public CloseableIterator streamingIterator() {
+ public CloseableIterator streamingIterator(BufferSupplier bufferSupplier) {
// the older message format versions do not support streaming, so we return the normal iterator
- return iterator();
+ return iterator(bufferSupplier);
}
static void writeHeader(ByteBuffer buffer, long offset, int size) {
@@ -270,30 +272,36 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
}
private static final class DataLogInputStream implements LogInputStream {
- private final DataInputStream stream;
+ private final InputStream stream;
protected final int maxMessageSize;
+ private final ByteBuffer offsetAndSizeBuffer;
- DataLogInputStream(DataInputStream stream, int maxMessageSize) {
+ DataLogInputStream(InputStream stream, int maxMessageSize) {
this.stream = stream;
this.maxMessageSize = maxMessageSize;
+ this.offsetAndSizeBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
}
public AbstractLegacyRecordBatch nextBatch() throws IOException {
- try {
- long offset = stream.readLong();
- int size = stream.readInt();
- if (size < LegacyRecord.RECORD_OVERHEAD_V0)
- throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
- if (size > maxMessageSize)
- throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
-
- byte[] recordBuffer = new byte[size];
- stream.readFully(recordBuffer, 0, size);
- ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
- return new BasicLegacyRecordBatch(offset, new LegacyRecord(buf));
- } catch (EOFException e) {
+ offsetAndSizeBuffer.clear();
+ Utils.readFully(stream, offsetAndSizeBuffer);
+ if (offsetAndSizeBuffer.hasRemaining())
return null;
- }
+
+ long offset = offsetAndSizeBuffer.getLong(Records.OFFSET_OFFSET);
+ int size = offsetAndSizeBuffer.getInt(Records.SIZE_OFFSET);
+ if (size < LegacyRecord.RECORD_OVERHEAD_V0)
+ throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
+ if (size > maxMessageSize)
+ throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+
+ ByteBuffer batchBuffer = ByteBuffer.allocate(size);
+ Utils.readFully(stream, batchBuffer);
+ if (batchBuffer.hasRemaining())
+ return null;
+ batchBuffer.flip();
+
+ return new BasicLegacyRecordBatch(offset, new LegacyRecord(batchBuffer));
}
}
@@ -302,7 +310,8 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
private final long absoluteBaseOffset;
private final byte wrapperMagic;
- private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize) {
+ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensureMatchingMagic,
+ int maxMessageSize, BufferSupplier bufferSupplier) {
LegacyRecord wrapperRecord = wrapperEntry.outerRecord();
this.wrapperMagic = wrapperRecord.magic();
@@ -312,8 +321,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +
wrapperMagic + ")");
- DataInputStream stream = new DataInputStream(compressionType.wrapForInput(
- new ByteBufferInputStream(wrapperValue), wrapperRecord.magic()));
+ InputStream stream = compressionType.wrapForInput(wrapperValue, wrapperRecord.magic(), bufferSupplier);
LogInputStream logStream = new DataLogInputStream(stream, maxMessageSize);
long wrapperRecordOffset = wrapperEntry.lastOffset();
diff --git a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
new file mode 100644
index 00000000000..2e09f7d1a2c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple non-threadsafe interface for caching byte buffers. This is suitable for simple cases like ensuring that
+ * a given KafkaConsumer reuses the same decompression buffer when iterating over fetched records. For small record
+ * batches, allocating a potentially large buffer (64 KB for LZ4) will dominate the cost of decompressing and
+ * iterating over the records in the batch.
+ */
+public abstract class BufferSupplier implements AutoCloseable {
+
+ public static final BufferSupplier NO_CACHING = new BufferSupplier() {
+ @Override
+ public ByteBuffer get(int capacity) {
+ return ByteBuffer.allocate(capacity);
+ }
+
+ @Override
+ public void release(ByteBuffer buffer) {}
+
+ @Override
+ public void close() {}
+ };
+
+ public static BufferSupplier create() {
+ return new DefaultSupplier();
+ }
+
+ /**
+ * Supply a buffer with the required capacity. This may return a cached buffer or allocate a new instance.
+ */
+ public abstract ByteBuffer get(int capacity);
+
+ /**
+ * Return the provided buffer to be reused by a subsequent call to `get`.
+ */
+ public abstract void release(ByteBuffer buffer);
+
+ /**
+ * Release all resources associated with this supplier.
+ */
+ public abstract void close();
+
+ private static class DefaultSupplier extends BufferSupplier {
+ // We currently use a single block size, so optimise for that case
+ private final Map> bufferMap = new HashMap<>(1);
+
+ @Override
+ public ByteBuffer get(int size) {
+ Deque bufferQueue = bufferMap.get(size);
+ if (bufferQueue == null || bufferQueue.isEmpty())
+ return ByteBuffer.allocate(size);
+ else
+ return bufferQueue.pollFirst();
+ }
+
+ @Override
+ public void release(ByteBuffer buffer) {
+ buffer.clear();
+ Deque bufferQueue = bufferMap.get(buffer.capacity());
+ if (bufferQueue == null) {
+ // We currently keep a single buffer in flight, so optimise for that case
+ bufferQueue = new ArrayDeque<>(1);
+ bufferMap.put(buffer.capacity(), bufferQueue);
+ }
+ bufferQueue.addLast(buffer);
+ }
+
+ @Override
+ public void close() {
+ bufferMap.clear();
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 15b59589413..742493b38a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -25,6 +25,7 @@ import java.io.OutputStream;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
+import java.nio.ByteBuffer;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -39,8 +40,8 @@ public enum CompressionType {
}
@Override
- public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
- return buffer;
+ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
+ return new ByteBufferInputStream(buffer);
}
},
@@ -55,9 +56,9 @@ public enum CompressionType {
}
@Override
- public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
+ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
- return new GZIPInputStream(buffer);
+ return new GZIPInputStream(new ByteBufferInputStream(buffer));
} catch (Exception e) {
throw new KafkaException(e);
}
@@ -75,9 +76,9 @@ public enum CompressionType {
}
@Override
- public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
+ public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
- return (InputStream) SnappyConstructors.INPUT.invoke(buffer);
+ return (InputStream) SnappyConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
} catch (Throwable e) {
throw new KafkaException(e);
}
@@ -88,18 +89,17 @@ public enum CompressionType {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) {
try {
- return (OutputStream) LZ4Constructors.OUTPUT.invoke(buffer,
- messageVersion == RecordBatch.MAGIC_VALUE_V0);
+ return new KafkaLZ4BlockOutputStream(buffer, messageVersion == RecordBatch.MAGIC_VALUE_V0);
} catch (Throwable e) {
throw new KafkaException(e);
}
}
@Override
- public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion) {
+ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
- return (InputStream) LZ4Constructors.INPUT.invoke(buffer,
- messageVersion == RecordBatch.MAGIC_VALUE_V0);
+ return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
+ messageVersion == RecordBatch.MAGIC_VALUE_V0);
} catch (Throwable e) {
throw new KafkaException(e);
}
@@ -116,9 +116,26 @@ public enum CompressionType {
this.rate = rate;
}
- public abstract OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize);
+ /**
+ * Wrap bufferStream with an OutputStream that will compress data with this CompressionType.
+ *
+ * Note: Unlike {@link #wrapForInput}, {@link #wrapForOutput} cannot take {@#link ByteBuffer}s directly.
+ * Currently, {@link MemoryRecordsBuilder#writeDefaultBatchHeader()} and {@link MemoryRecordsBuilder#writeLegacyCompressedWrapperHeader()}
+ * write to the underlying buffer in the given {@link ByteBufferOutputStream} after the compressed data has been written.
+ * In the event that the buffer needs to be expanded while writing the data, access to the underlying buffer needs to be preserved.
+ */
+ public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion, int bufferSize);
- public abstract InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion);
+ /**
+ * Wrap buffer with an InputStream that will decompress data with this CompressionType.
+ *
+ * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported.
+ * For small record batches, allocating a potentially large buffer (64 KB for LZ4)
+ * will dominate the cost of decompressing and iterating over the records in the
+ * batch. As such, a supplier that reuses buffers will have a significant
+ * performance impact.
+ */
+ public abstract InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier);
public static CompressionType forId(int id) {
switch (id) {
@@ -148,21 +165,14 @@ public enum CompressionType {
throw new IllegalArgumentException("Unknown compression name: " + name);
}
- // Dynamically load the Snappy and LZ4 classes so that we only have a runtime dependency on compression algorithms
- // that are used. This is important for platforms that are not supported by the underlying libraries.
- // Note that we are using the initialization-on-demand holder idiom, so it's important that the initialisation
- // is done in separate classes (one per compression type).
-
- private static class LZ4Constructors {
- static final MethodHandle INPUT = findConstructor(
- "org.apache.kafka.common.record.KafkaLZ4BlockInputStream",
- MethodType.methodType(void.class, InputStream.class, Boolean.TYPE));
-
- static final MethodHandle OUTPUT = findConstructor(
- "org.apache.kafka.common.record.KafkaLZ4BlockOutputStream",
- MethodType.methodType(void.class, OutputStream.class, Boolean.TYPE));
-
- }
+ // We should only have a runtime dependency on compression algorithms in case the native libraries don't support
+ // some platforms.
+ //
+ // For Snappy, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
+ // they're only loaded if used.
+ //
+ // For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger
+ // an error until KafkaLZ4BlockInputStream is initialized, which only happens if LZ4 is actually used.
private static class SnappyConstructors {
static final MethodHandle INPUT = findConstructor("org.xerial.snappy.SnappyInputStream",
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index e61bbc9fe59..5972b426618 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.utils.Checksums;
import org.apache.kafka.common.utils.Crc32C;
import org.apache.kafka.common.utils.Utils;
-import java.io.DataInputStream;
+import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -284,14 +284,14 @@ public class DefaultRecord implements Record {
return result;
}
- public static DefaultRecord readFrom(DataInputStream input,
+ public static DefaultRecord readFrom(DataInput input,
long baseOffset,
long baseTimestamp,
int baseSequence,
Long logAppendTime) throws IOException {
int sizeOfBodyInBytes = ByteUtils.readVarint(input);
ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
- input.readFully(recordBuffer.array(), recordBuffer.arrayOffset(), sizeOfBodyInBytes);
+ input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes);
int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
return readFrom(recordBuffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index f01116e35d0..2bf889f9dd1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
@@ -226,17 +225,17 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET);
}
- private CloseableIterator compressedIterator() {
+ private CloseableIterator compressedIterator(BufferSupplier bufferSupplier) {
ByteBuffer buffer = this.buffer.duplicate();
buffer.position(RECORDS_OFFSET);
- final DataInputStream stream = new DataInputStream(compressionType().wrapForInput(
- new ByteBufferInputStream(buffer), magic()));
+ final DataInputStream inputStream = new DataInputStream(compressionType().wrapForInput(buffer, magic(),
+ bufferSupplier));
return new RecordIterator() {
@Override
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
- return DefaultRecord.readFrom(stream, baseOffset, baseTimestamp, baseSequence, logAppendTime);
+ return DefaultRecord.readFrom(inputStream, baseOffset, baseTimestamp, baseSequence, logAppendTime);
} catch (IOException e) {
throw new KafkaException("Failed to decompress record stream", e);
}
@@ -245,7 +244,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
@Override
public void close() {
try {
- stream.close();
+ inputStream.close();
} catch (IOException e) {
throw new KafkaException("Failed to close record stream", e);
}
@@ -274,7 +273,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
// for a normal iterator, we cannot ensure that the underlying compression stream is closed,
// so we decompress the full record set here. Use cases which call for a lower memory footprint
// can use `streamingIterator` at the cost of additional complexity
- try (CloseableIterator iterator = compressedIterator()) {
+ try (CloseableIterator iterator = compressedIterator(BufferSupplier.NO_CACHING)) {
List records = new ArrayList<>(count());
while (iterator.hasNext())
records.add(iterator.next());
@@ -284,9 +283,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
@Override
- public CloseableIterator streamingIterator() {
+ public CloseableIterator streamingIterator(BufferSupplier bufferSupplier) {
if (isCompressed())
- return compressedIterator();
+ return compressedIterator(bufferSupplier);
else
return uncompressedIterator();
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 5fe1cef0130..57fec4fc686 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -218,9 +218,9 @@ public class FileLogInputStream implements LogInputStream streamingIterator() {
+ public CloseableIterator streamingIterator(BufferSupplier bufferSupplier) {
loadUnderlyingRecordBatch();
- return underlying.streamingIterator();
+ return underlying.streamingIterator(bufferSupplier);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index a53690c72af..56f10581db4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -16,79 +16,75 @@
*/
package org.apache.kafka.common.record;
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
-import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
-import org.apache.kafka.common.utils.ByteUtils;
-
import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory;
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
+
/**
* A partial implementation of the v1.5.1 LZ4 Frame format.
*
- * @see LZ4 Frame Format
+ * @see LZ4 Frame Format
+ *
+ * This class is not thread-safe.
*/
-public final class KafkaLZ4BlockInputStream extends FilterInputStream {
+public final class KafkaLZ4BlockInputStream extends InputStream {
public static final String PREMATURE_EOS = "Stream ended prematurely";
public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)";
public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
- private final LZ4SafeDecompressor decompressor;
- private final XXHash32 checksum;
- private final byte[] buffer;
- private final byte[] compressedBuffer;
- private final int maxBlockSize;
+ private static final LZ4SafeDecompressor DECOMPRESSOR = LZ4Factory.fastestInstance().safeDecompressor();
+ private static final XXHash32 CHECKSUM = XXHashFactory.fastestInstance().hash32();
+
+ private final ByteBuffer in;
private final boolean ignoreFlagDescriptorChecksum;
+ private final BufferSupplier bufferSupplier;
+ private final ByteBuffer decompressionBuffer;
+ // `flg` and `maxBlockSize` are effectively final, they are initialised in the `readHeader` method that is only
+ // invoked from the constructor
private FLG flg;
- private BD bd;
- private int bufferOffset;
- private int bufferSize;
+ private int maxBlockSize;
+
+ // If a block is compressed, this is the same as `decompressionBuffer`. If a block is not compressed, this is
+ // a slice of `in` to avoid unnecessary copies.
+ private ByteBuffer decompressedBuffer;
private boolean finished;
/**
* Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
*
- * @param in The stream to decompress
+ * @param in The byte buffer to decompress
* @param ignoreFlagDescriptorChecksum for compatibility with old kafka clients, ignore incorrect HC byte
* @throws IOException
*/
- public KafkaLZ4BlockInputStream(InputStream in, boolean ignoreFlagDescriptorChecksum) throws IOException {
- super(in);
- decompressor = LZ4Factory.fastestInstance().safeDecompressor();
- checksum = XXHashFactory.fastestInstance().hash32();
+ public KafkaLZ4BlockInputStream(ByteBuffer in, BufferSupplier bufferSupplier, boolean ignoreFlagDescriptorChecksum) throws IOException {
this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
+ this.in = in.duplicate().order(ByteOrder.LITTLE_ENDIAN);
+ this.bufferSupplier = bufferSupplier;
readHeader();
- maxBlockSize = bd.getBlockMaximumSize();
- buffer = new byte[maxBlockSize];
- compressedBuffer = new byte[maxBlockSize];
- bufferOffset = 0;
- bufferSize = 0;
+ decompressionBuffer = bufferSupplier.get(maxBlockSize);
+ if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) {
+ // require array backed decompression buffer with zero offset
+ // to simplify workaround for https://github.com/lz4/lz4-java/pull/65
+ throw new RuntimeException("decompression buffer must have backing array with zero array offset");
+ }
finished = false;
}
- /**
- * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
- *
- * @param in The stream to decompress
- * @throws IOException
- */
- public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
- this(in, false);
- }
-
/**
* Check whether KafkaLZ4BlockInputStream is configured to ignore the
* Frame Descriptor checksum, which is useful for compatibility with
@@ -99,43 +95,50 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
}
/**
- * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
+ * Reads the magic number and frame descriptor from input buffer.
*
* @throws IOException
*/
private void readHeader() throws IOException {
- byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
-
// read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
- int headerOffset = 6;
- if (in.read(header, 0, headerOffset) != headerOffset) {
+ if (in.remaining() < 6) {
throw new IOException(PREMATURE_EOS);
}
- if (MAGIC != ByteUtils.readUnsignedIntLE(header, headerOffset - 6)) {
+ if (MAGIC != in.getInt()) {
throw new IOException(NOT_SUPPORTED);
}
- flg = FLG.fromByte(header[headerOffset - 2]);
- bd = BD.fromByte(header[headerOffset - 1]);
+ // mark start of data to checksum
+ in.mark();
+
+ flg = FLG.fromByte(in.get());
+ maxBlockSize = BD.fromByte(in.get()).getBlockMaximumSize();
if (flg.isContentSizeSet()) {
- if (in.read(header, headerOffset, 8) != 8)
+ if (in.remaining() < 8) {
throw new IOException(PREMATURE_EOS);
- headerOffset += 8;
+ }
+ in.position(in.position() + 8);
}
// Final byte of Frame Descriptor is HC checksum
- header[headerOffset++] = (byte) in.read();
// Old implementations produced incorrect HC checksums
- if (ignoreFlagDescriptorChecksum)
+ if (ignoreFlagDescriptorChecksum) {
+ in.position(in.position() + 1);
return;
+ }
- int offset = 4;
- int len = headerOffset - offset - 1; // dont include magic bytes or HC
- byte hash = (byte) ((checksum.hash(header, offset, len, 0) >> 8) & 0xFF);
- if (hash != header[headerOffset - 1])
+ int len = in.position() - in.reset().position();
+
+ int hash = in.hasArray() ?
+ // workaround for https://github.com/lz4/lz4-java/pull/65
+ CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) :
+ CHECKSUM.hash(in, in.position(), len, 0);
+ in.position(in.position() + len);
+ if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+ }
}
/**
@@ -145,46 +148,70 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
* @throws IOException
*/
private void readBlock() throws IOException {
- int blockSize = ByteUtils.readUnsignedIntLE(in);
+ if (in.remaining() < 4) {
+ throw new IOException(PREMATURE_EOS);
+ }
+
+ int blockSize = in.getInt();
+ boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
+ blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
// Check for EndMark
if (blockSize == 0) {
finished = true;
if (flg.isContentChecksumSet())
- ByteUtils.readUnsignedIntLE(in); // TODO: verify this content checksum
+ in.getInt(); // TODO: verify this content checksum
return;
} else if (blockSize > maxBlockSize) {
throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
}
- boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
- byte[] bufferToRead;
- if (compressed) {
- bufferToRead = compressedBuffer;
- } else {
- blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
- bufferToRead = buffer;
- bufferSize = blockSize;
- }
-
- if (in.read(bufferToRead, 0, blockSize) != blockSize) {
+ if (in.remaining() < blockSize) {
throw new IOException(PREMATURE_EOS);
}
- // verify checksum
- if (flg.isBlockChecksumSet() && ByteUtils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
- throw new IOException(BLOCK_HASH_MISMATCH);
- }
-
if (compressed) {
try {
- bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
+ // workaround for https://github.com/lz4/lz4-java/pull/65
+ final int bufferSize;
+ if (in.hasArray()) {
+ bufferSize = DECOMPRESSOR.decompress(
+ in.array(),
+ in.position() + in.arrayOffset(),
+ blockSize,
+ decompressionBuffer.array(),
+ 0,
+ maxBlockSize
+ );
+ } else {
+ // decompressionBuffer has zero arrayOffset, so we don't need to worry about
+ // https://github.com/lz4/lz4-java/pull/65
+ bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize);
+ }
+ decompressionBuffer.position(0);
+ decompressionBuffer.limit(bufferSize);
+ decompressedBuffer = decompressionBuffer;
} catch (LZ4Exception e) {
throw new IOException(e);
}
+ } else {
+ decompressedBuffer = in.slice();
+ decompressedBuffer.limit(blockSize);
}
- bufferOffset = 0;
+ // verify checksum
+ if (flg.isBlockChecksumSet()) {
+ // workaround for https://github.com/lz4/lz4-java/pull/65
+ int hash = in.hasArray() ?
+ CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) :
+ CHECKSUM.hash(in, in.position(), blockSize, 0);
+ in.position(in.position() + blockSize);
+ if (hash != in.getInt()) {
+ throw new IOException(BLOCK_HASH_MISMATCH);
+ }
+ } else {
+ in.position(in.position() + blockSize);
+ }
}
@Override
@@ -199,7 +226,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
return -1;
}
- return buffer[bufferOffset++] & 0xFF;
+ return decompressedBuffer.get() & 0xFF;
}
@Override
@@ -215,8 +242,8 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
return -1;
}
len = Math.min(len, available());
- System.arraycopy(buffer, bufferOffset, b, off, len);
- bufferOffset += len;
+
+ decompressedBuffer.get(b, off, len);
return len;
}
@@ -231,28 +258,28 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
if (finished) {
return 0;
}
- n = Math.min(n, available());
- bufferOffset += n;
- return n;
+ int skipped = (int) Math.min(n, available());
+ decompressedBuffer.position(decompressedBuffer.position() + skipped);
+ return skipped;
}
@Override
public int available() throws IOException {
- return bufferSize - bufferOffset;
+ return decompressedBuffer == null ? 0 : decompressedBuffer.remaining();
}
@Override
public void close() throws IOException {
- in.close();
+ bufferSupplier.release(decompressionBuffer);
}
@Override
- public synchronized void mark(int readlimit) {
+ public void mark(int readlimit) {
throw new RuntimeException("mark not supported");
}
@Override
- public synchronized void reset() throws IOException {
+ public void reset() throws IOException {
throw new RuntimeException("reset not supported");
}
@@ -260,5 +287,4 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
public boolean markSupported() {
return false;
}
-
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index d029489d3ac..8cfc37be826 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -30,7 +30,9 @@ import net.jpountz.xxhash.XXHashFactory;
/**
* A partial implementation of the v1.5.1 LZ4 Frame format.
*
- * @see LZ4 Frame Format
+ * @see LZ4 Frame Format
+ *
+ * This class is not thread-safe.
*/
public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index db75105e957..ef773dade46 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -219,9 +219,14 @@ public interface RecordBatch extends Iterable {
* are actually asked for using {@link Iterator#next()}. If the message format does not support streaming
* iteration, then the normal iterator is returned. Either way, callers should ensure that the iterator is closed.
*
+ * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported.
+ * For small record batches, allocating a potentially large buffer (64 KB for LZ4)
+ * will dominate the cost of decompressing and iterating over the records in the
+ * batch. As such, a supplier that reuses buffers will have a significant
+ * performance impact.
* @return The closeable iterator
*/
- CloseableIterator streamingIterator();
+ CloseableIterator streamingIterator(BufferSupplier decompressionBufferSupplier);
/**
* Check whether this is a control batch (i.e. whether the control bit is set in the batch attributes).
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 24ee7881aac..9fbc387664a 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -762,6 +762,31 @@ public class Utils {
} while (bytesRead != -1 && destinationBuffer.hasRemaining());
}
+ /**
+ * Read data from the input stream to the given byte buffer until there are no bytes remaining in the buffer or the
+ * end of the stream has been reached.
+ *
+ * @param inputStream Input stream to read from
+ * @param destinationBuffer The buffer into which bytes are to be transferred (it must be backed by an array)
+ *
+ * @throws IOException If an I/O error occurs
+ */
+ public static final void readFully(InputStream inputStream, ByteBuffer destinationBuffer) throws IOException {
+ if (!destinationBuffer.hasArray())
+ throw new IllegalArgumentException("destinationBuffer must be backed by an array");
+ int initialOffset = destinationBuffer.arrayOffset() + destinationBuffer.position();
+ byte[] array = destinationBuffer.array();
+ int length = destinationBuffer.remaining();
+ int totalBytesRead = 0;
+ do {
+ int bytesRead = inputStream.read(array, initialOffset + totalBytesRead, length - totalBytesRead);
+ if (bytesRead == -1)
+ break;
+ totalBytesRead += bytesRead;
+ } while (length > totalBytesRead);
+ destinationBuffer.position(destinationBuffer.position() + totalBytesRead);
+ }
+
public static void writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException {
while (sourceBuffer.hasRemaining())
channel.write(sourceBuffer);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
index 98dc5910a22..fe196c8bad1 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/CompressionTypeTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.record;
-import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.junit.Test;
@@ -37,7 +36,7 @@ public class CompressionTypeTest {
buffer.rewind();
KafkaLZ4BlockInputStream in = (KafkaLZ4BlockInputStream) CompressionType.LZ4.wrapForInput(
- new ByteBufferInputStream(buffer), RecordBatch.MAGIC_VALUE_V0);
+ buffer, RecordBatch.MAGIC_VALUE_V0, BufferSupplier.NO_CACHING);
assertTrue(in.ignoreFlagDescriptorChecksum());
}
@@ -51,7 +50,7 @@ public class CompressionTypeTest {
buffer.rewind();
KafkaLZ4BlockInputStream in = (KafkaLZ4BlockInputStream) CompressionType.LZ4.wrapForInput(
- new ByteBufferInputStream(buffer), RecordBatch.MAGIC_VALUE_V1);
+ buffer, RecordBatch.MAGIC_VALUE_V1, BufferSupplier.create());
assertFalse(in.ignoreFlagDescriptorChecksum());
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index ec858aa53c9..3db1159967d 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -260,7 +260,7 @@ public class DefaultRecordBatchTest {
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
- try (CloseableIterator streamingIterator = batch.streamingIterator()) {
+ try (CloseableIterator streamingIterator = batch.streamingIterator(BufferSupplier.create())) {
TestUtils.checkEquals(streamingIterator, batch.iterator());
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
index 2c3b2fcda0a..222599b8b73 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
@@ -16,63 +16,159 @@
*/
package org.apache.kafka.common.record;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
import net.jpountz.xxhash.XXHashFactory;
+import org.hamcrest.CoreMatchers;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
@RunWith(value = Parameterized.class)
public class KafkaLZ4Test {
+ private final static Random RANDOM = new Random(0);
+
private final boolean useBrokenFlagDescriptorChecksum;
private final boolean ignoreFlagDescriptorChecksum;
private final byte[] payload;
private final boolean close;
+ private final boolean blockChecksum;
- public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum, byte[] payload, boolean close) {
- this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
- this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
- this.payload = payload;
- this.close = close;
+ static class Payload {
+ String name;
+ byte[] payload;
+
+ Payload(String name, byte[] payload) {
+ this.name = name;
+ this.payload = payload;
+ }
+
+ @Override
+ public String toString() {
+ return "Payload{" +
+ "size=" + payload.length +
+ ", name='" + name + '\'' +
+ '}';
+ }
}
- @Parameters
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Parameters(name = "{index} useBrokenFlagDescriptorChecksum={0}, ignoreFlagDescriptorChecksum={1}, blockChecksum={2}, close={3}, payload={4}")
public static Collection