diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index afa85a41194..37d53b81dcb 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -77,7 +77,7 @@ public class Compressor {
@Override
public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
- .getConstructor(InputStream.class);
+ .getConstructor(InputStream.class, Boolean.TYPE);
}
});
@@ -275,7 +275,7 @@ public class Compressor {
}
}
- static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type) {
+ static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
try {
switch (type) {
case NONE:
@@ -291,7 +291,8 @@ public class Compressor {
}
case LZ4:
try {
- InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer);
+ InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
+ messageVersion == Record.MAGIC_VALUE_V0);
return new DataInputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index 372d4f4fb43..92718d896bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -36,15 +36,14 @@ import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory;
/**
- * A partial implementation of the v1.4.1 LZ4 Frame format.
+ * A partial implementation of the v1.5.1 LZ4 Frame format.
*
- * @see LZ4 Framing
- * Format Spec
+ * @see LZ4 Frame Format
*/
public final class KafkaLZ4BlockInputStream extends FilterInputStream {
public static final String PREMATURE_EOS = "Stream ended prematurely";
- public static final String NOT_SUPPORTED = "Stream unsupported";
+ public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)";
public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
@@ -53,6 +52,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
private final byte[] buffer;
private final byte[] compressedBuffer;
private final int maxBlockSize;
+ private final boolean ignoreFlagDescriptorChecksum;
private FLG flg;
private BD bd;
private int bufferOffset;
@@ -63,12 +63,14 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
* Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
*
* @param in The stream to decompress
+ * @param ignoreFlagDescriptorChecksum for compatibility with old kafka clients, ignore incorrect HC byte
* @throws IOException
*/
- public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
+ public KafkaLZ4BlockInputStream(InputStream in, boolean ignoreFlagDescriptorChecksum) throws IOException {
super(in);
decompressor = LZ4Factory.fastestInstance().safeDecompressor();
checksum = XXHashFactory.fastestInstance().hash32();
+ this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
readHeader();
maxBlockSize = bd.getBlockMaximumSize();
buffer = new byte[maxBlockSize];
@@ -78,6 +80,25 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
finished = false;
}
+ /**
+ * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
+ *
+ * @param in The stream to decompress
+ * @throws IOException
+ */
+ public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
+ this(in, false);
+ }
+
+ /**
+ * Check whether KafkaLZ4BlockInputStream is configured to ignore the
+ * Frame Descriptor checksum, which is useful for compatibility with
+ * old client implementations that use incorrect checksum calculations.
+ */
+ public boolean ignoreFlagDescriptorChecksum() {
+ return this.ignoreFlagDescriptorChecksum;
+ }
+
/**
* Reads the magic number and frame descriptor from the underlying {@link InputStream}.
*
@@ -87,25 +108,35 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
// read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
- bufferOffset = 6;
- if (in.read(header, 0, bufferOffset) != bufferOffset) {
+ int headerOffset = 6;
+ if (in.read(header, 0, headerOffset) != headerOffset) {
throw new IOException(PREMATURE_EOS);
}
- if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset - 6)) {
+ if (MAGIC != Utils.readUnsignedIntLE(header, headerOffset - 6)) {
throw new IOException(NOT_SUPPORTED);
}
- flg = FLG.fromByte(header[bufferOffset - 2]);
- bd = BD.fromByte(header[bufferOffset - 1]);
- // TODO read uncompressed content size, update flg.validate()
- // TODO read dictionary id, update flg.validate()
+ flg = FLG.fromByte(header[headerOffset - 2]);
+ bd = BD.fromByte(header[headerOffset - 1]);
- // check stream descriptor hash
- byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF);
- header[bufferOffset++] = (byte) in.read();
- if (hash != header[bufferOffset - 1]) {
- throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+ if (flg.isContentSizeSet()) {
+ if (in.read(header, headerOffset, 8) != 8)
+ throw new IOException(PREMATURE_EOS);
+ headerOffset += 8;
}
+
+ // Final byte of Frame Descriptor is HC checksum
+ header[headerOffset++] = (byte) in.read();
+
+ // Old implementations produced incorrect HC checksums
+ if (ignoreFlagDescriptorChecksum)
+ return;
+
+ int offset = 4;
+ int len = headerOffset - offset - 1; // dont include magic bytes or HC
+ byte hash = (byte) ((checksum.hash(header, offset, len, 0) >> 8) & 0xFF);
+ if (hash != header[headerOffset - 1])
+ throw new IOException(DESCRIPTOR_HASH_MISMATCH);
}
/**
@@ -120,7 +151,8 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
// Check for EndMark
if (blockSize == 0) {
finished = true;
- // TODO implement content checksum, update flg.validate()
+ if (flg.isContentChecksumSet())
+ Utils.readUnsignedIntLE(in); // TODO: verify this content checksum
return;
} else if (blockSize > maxBlockSize) {
throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index 7d23f4aa4c4..933b2cfaa30 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -29,10 +29,9 @@ import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory;
/**
- * A partial implementation of the v1.4.1 LZ4 Frame format.
+ * A partial implementation of the v1.5.1 LZ4 Frame format.
*
- * @see LZ4 Framing
- * Format Spec
+ * @see LZ4 Frame Format
*/
public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
@@ -49,6 +48,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
private final LZ4Compressor compressor;
private final XXHash32 checksum;
+ private final boolean useBrokenFlagDescriptorChecksum;
private final FLG flg;
private final BD bd;
private final byte[] buffer;
@@ -65,12 +65,15 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
* values will generate an exception
* @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
* every block of data
+ * @param useBrokenFlagDescriptorChecksum Default: false. When true, writes an incorrect FrameDescriptor checksum
+ * compatible with older kafka clients.
* @throws IOException
*/
- public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
+ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException {
super(out);
compressor = LZ4Factory.fastestInstance().fastCompressor();
checksum = XXHashFactory.fastestInstance().hash32();
+ this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
bd = new BD(blockSize);
flg = new FLG(blockChecksum);
bufferOffset = 0;
@@ -81,6 +84,20 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
writeHeader();
}
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The output stream to compress
+ * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+ * values will generate an exception
+ * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
+ * every block of data
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
+ this(out, blockSize, blockChecksum, false);
+ }
+
/**
* Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
*
@@ -90,7 +107,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
* @throws IOException
*/
public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
- this(out, blockSize, false);
+ this(out, blockSize, false, false);
}
/**
@@ -103,6 +120,19 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
this(out, BLOCKSIZE_64KB);
}
+ public KafkaLZ4BlockOutputStream(OutputStream out, boolean useBrokenHC) throws IOException {
+ this(out, BLOCKSIZE_64KB, false, useBrokenHC);
+ }
+
+ /**
+ * Check whether KafkaLZ4BlockInputStream is configured to write an
+ * incorrect Frame Descriptor checksum, which is useful for
+ * compatibility with old client implementations.
+ */
+ public boolean useBrokenFlagDescriptorChecksum() {
+ return this.useBrokenFlagDescriptorChecksum;
+ }
+
/**
* Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
*
@@ -114,10 +144,17 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
buffer[bufferOffset++] = flg.toByte();
buffer[bufferOffset++] = bd.toByte();
// TODO write uncompressed content size, update flg.validate()
- // TODO write dictionary id, update flg.validate()
+
// compute checksum on all descriptor fields
- int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF;
- buffer[bufferOffset++] = (byte) hash;
+ int offset = 4;
+ int len = bufferOffset - offset;
+ if (this.useBrokenFlagDescriptorChecksum) {
+ len += offset;
+ offset = 0;
+ }
+ byte hash = (byte) ((checksum.hash(buffer, offset, len, 0) >> 8) & 0xFF);
+ buffer[bufferOffset++] = hash;
+
// write out frame descriptor
out.write(buffer, 0, bufferOffset);
bufferOffset = 0;
@@ -236,8 +273,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
private static final int VERSION = 1;
- private final int presetDictionary;
- private final int reserved1;
+ private final int reserved;
private final int contentChecksum;
private final int contentSize;
private final int blockChecksum;
@@ -249,18 +285,16 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
}
public FLG(boolean blockChecksum) {
- this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
+ this(0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
}
- private FLG(int presetDictionary,
- int reserved1,
+ private FLG(int reserved,
int contentChecksum,
int contentSize,
int blockChecksum,
int blockIndependence,
int version) {
- this.presetDictionary = presetDictionary;
- this.reserved1 = reserved1;
+ this.reserved = reserved;
this.contentChecksum = contentChecksum;
this.contentSize = contentSize;
this.blockChecksum = blockChecksum;
@@ -270,16 +304,14 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
}
public static FLG fromByte(byte flg) {
- int presetDictionary = (flg >>> 0) & 1;
- int reserved1 = (flg >>> 1) & 1;
+ int reserved = (flg >>> 0) & 3;
int contentChecksum = (flg >>> 2) & 1;
int contentSize = (flg >>> 3) & 1;
int blockChecksum = (flg >>> 4) & 1;
int blockIndependence = (flg >>> 5) & 1;
int version = (flg >>> 6) & 3;
- return new FLG(presetDictionary,
- reserved1,
+ return new FLG(reserved,
contentChecksum,
contentSize,
blockChecksum,
@@ -288,22 +320,13 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
}
public byte toByte() {
- return (byte) (((presetDictionary & 1) << 0) | ((reserved1 & 1) << 1) | ((contentChecksum & 1) << 2)
+ return (byte) (((reserved & 3) << 0) | ((contentChecksum & 1) << 2)
| ((contentSize & 1) << 3) | ((blockChecksum & 1) << 4) | ((blockIndependence & 1) << 5) | ((version & 3) << 6));
}
private void validate() {
- if (presetDictionary != 0) {
- throw new RuntimeException("Preset dictionary is unsupported");
- }
- if (reserved1 != 0) {
- throw new RuntimeException("Reserved1 field must be 0");
- }
- if (contentChecksum != 0) {
- throw new RuntimeException("Content checksum is unsupported");
- }
- if (contentSize != 0) {
- throw new RuntimeException("Content size is unsupported");
+ if (reserved != 0) {
+ throw new RuntimeException("Reserved bits must be 0");
}
if (blockIndependence != 1) {
throw new RuntimeException("Dependent block stream is unsupported");
@@ -313,10 +336,6 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
}
}
- public boolean isPresetDictionarySet() {
- return presetDictionary == 1;
- }
-
public boolean isContentChecksumSet() {
return contentChecksum == 1;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 7175953d667..fcf7f446a45 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -187,10 +187,10 @@ public class MemoryRecords implements Records {
public Iterator iterator() {
if (writable) {
// flip on a duplicate buffer for reading
- return new RecordsIterator((ByteBuffer) this.buffer.duplicate().flip(), CompressionType.NONE, false);
+ return new RecordsIterator((ByteBuffer) this.buffer.duplicate().flip(), false);
} else {
// do not need to flip for non-writable buffer
- return new RecordsIterator(this.buffer.duplicate(), CompressionType.NONE, false);
+ return new RecordsIterator(this.buffer.duplicate(), false);
}
}
@@ -224,11 +224,11 @@ public class MemoryRecords implements Records {
private final ArrayDeque logEntries;
private final long absoluteBaseOffset;
- public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) {
- this.type = type;
+ public RecordsIterator(ByteBuffer buffer, boolean shallow) {
+ this.type = CompressionType.NONE;
this.buffer = buffer;
this.shallow = shallow;
- this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
+ this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
this.logEntries = null;
this.absoluteBaseOffset = -1;
}
@@ -238,7 +238,7 @@ public class MemoryRecords implements Records {
this.type = entry.record().compressionType();
this.buffer = entry.record().value();
this.shallow = true;
- this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
+ this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
long wrapperRecordOffset = entry.offset();
// If relative offset is used, we need to decompress the entire message first to compute
// the absolute offset.
diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
new file mode 100644
index 00000000000..37877efe8fa
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class KafkaLZ4Test {
+
+ private final boolean useBrokenFlagDescriptorChecksum;
+ private final boolean ignoreFlagDescriptorChecksum;
+ private final byte[] payload;
+
+ public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum, byte[] payload) {
+ this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
+ this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
+ this.payload = payload;
+ }
+
+ @Parameters
+ public static Collection