KAFKA-3160; Fix LZ4 Framing

This contribution is my original work and I license the work under Apache 2.0.

Author: Dana Powers <dana.powers@gmail.com>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1212 from dpkp/KAFKA-3160
This commit is contained in:
Dana Powers 2016-05-07 19:35:55 +01:00 committed by Ismael Juma
parent c4bbf34243
commit 8fe2552239
11 changed files with 289 additions and 73 deletions

View File

@ -77,7 +77,7 @@ public class Compressor {
@Override @Override
public Constructor get() throws ClassNotFoundException, NoSuchMethodException { public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream") return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
.getConstructor(InputStream.class); .getConstructor(InputStream.class, Boolean.TYPE);
} }
}); });
@ -275,7 +275,7 @@ public class Compressor {
} }
} }
static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type) { static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
try { try {
switch (type) { switch (type) {
case NONE: case NONE:
@ -291,7 +291,8 @@ public class Compressor {
} }
case LZ4: case LZ4:
try { try {
InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer); InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
messageVersion == Record.MAGIC_VALUE_V0);
return new DataInputStream(stream); return new DataInputStream(stream);
} catch (Exception e) { } catch (Exception e) {
throw new KafkaException(e); throw new KafkaException(e);

View File

@ -36,15 +36,14 @@ import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory; import net.jpountz.xxhash.XXHashFactory;
/** /**
* A partial implementation of the v1.4.1 LZ4 Frame format. * A partial implementation of the v1.5.1 LZ4 Frame format.
* *
* @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing * @see <a href="http://cyan4973.github.io/lz4/lz4_Frame_format.html">LZ4 Frame Format</a>
* Format Spec</a>
*/ */
public final class KafkaLZ4BlockInputStream extends FilterInputStream { public final class KafkaLZ4BlockInputStream extends FilterInputStream {
public static final String PREMATURE_EOS = "Stream ended prematurely"; public static final String PREMATURE_EOS = "Stream ended prematurely";
public static final String NOT_SUPPORTED = "Stream unsupported"; public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)";
public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch"; public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted"; public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
@ -53,6 +52,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
private final byte[] buffer; private final byte[] buffer;
private final byte[] compressedBuffer; private final byte[] compressedBuffer;
private final int maxBlockSize; private final int maxBlockSize;
private final boolean ignoreFlagDescriptorChecksum;
private FLG flg; private FLG flg;
private BD bd; private BD bd;
private int bufferOffset; private int bufferOffset;
@ -63,12 +63,14 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
* Create a new {@link InputStream} that will decompress data using the LZ4 algorithm. * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
* *
* @param in The stream to decompress * @param in The stream to decompress
* @param ignoreFlagDescriptorChecksum for compatibility with old kafka clients, ignore incorrect HC byte
* @throws IOException * @throws IOException
*/ */
public KafkaLZ4BlockInputStream(InputStream in) throws IOException { public KafkaLZ4BlockInputStream(InputStream in, boolean ignoreFlagDescriptorChecksum) throws IOException {
super(in); super(in);
decompressor = LZ4Factory.fastestInstance().safeDecompressor(); decompressor = LZ4Factory.fastestInstance().safeDecompressor();
checksum = XXHashFactory.fastestInstance().hash32(); checksum = XXHashFactory.fastestInstance().hash32();
this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
readHeader(); readHeader();
maxBlockSize = bd.getBlockMaximumSize(); maxBlockSize = bd.getBlockMaximumSize();
buffer = new byte[maxBlockSize]; buffer = new byte[maxBlockSize];
@ -78,6 +80,25 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
finished = false; finished = false;
} }
/**
* Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
*
* @param in The stream to decompress
* @throws IOException
*/
public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
this(in, false);
}
/**
* Check whether KafkaLZ4BlockInputStream is configured to ignore the
* Frame Descriptor checksum, which is useful for compatibility with
* old client implementations that use incorrect checksum calculations.
*/
public boolean ignoreFlagDescriptorChecksum() {
return this.ignoreFlagDescriptorChecksum;
}
/** /**
* Reads the magic number and frame descriptor from the underlying {@link InputStream}. * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
* *
@ -87,25 +108,35 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
byte[] header = new byte[LZ4_MAX_HEADER_LENGTH]; byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
// read first 6 bytes into buffer to check magic and FLG/BD descriptor flags // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
bufferOffset = 6; int headerOffset = 6;
if (in.read(header, 0, bufferOffset) != bufferOffset) { if (in.read(header, 0, headerOffset) != headerOffset) {
throw new IOException(PREMATURE_EOS); throw new IOException(PREMATURE_EOS);
} }
if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset - 6)) { if (MAGIC != Utils.readUnsignedIntLE(header, headerOffset - 6)) {
throw new IOException(NOT_SUPPORTED); throw new IOException(NOT_SUPPORTED);
} }
flg = FLG.fromByte(header[bufferOffset - 2]); flg = FLG.fromByte(header[headerOffset - 2]);
bd = BD.fromByte(header[bufferOffset - 1]); bd = BD.fromByte(header[headerOffset - 1]);
// TODO read uncompressed content size, update flg.validate()
// TODO read dictionary id, update flg.validate()
// check stream descriptor hash if (flg.isContentSizeSet()) {
byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF); if (in.read(header, headerOffset, 8) != 8)
header[bufferOffset++] = (byte) in.read(); throw new IOException(PREMATURE_EOS);
if (hash != header[bufferOffset - 1]) { headerOffset += 8;
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
} }
// Final byte of Frame Descriptor is HC checksum
header[headerOffset++] = (byte) in.read();
// Old implementations produced incorrect HC checksums
if (ignoreFlagDescriptorChecksum)
return;
int offset = 4;
int len = headerOffset - offset - 1; // dont include magic bytes or HC
byte hash = (byte) ((checksum.hash(header, offset, len, 0) >> 8) & 0xFF);
if (hash != header[headerOffset - 1])
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
} }
/** /**
@ -120,7 +151,8 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
// Check for EndMark // Check for EndMark
if (blockSize == 0) { if (blockSize == 0) {
finished = true; finished = true;
// TODO implement content checksum, update flg.validate() if (flg.isContentChecksumSet())
Utils.readUnsignedIntLE(in); // TODO: verify this content checksum
return; return;
} else if (blockSize > maxBlockSize) { } else if (blockSize > maxBlockSize) {
throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize)); throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));

View File

@ -29,10 +29,9 @@ import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory; import net.jpountz.xxhash.XXHashFactory;
/** /**
* A partial implementation of the v1.4.1 LZ4 Frame format. * A partial implementation of the v1.5.1 LZ4 Frame format.
* *
* @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing * @see <a href="http://cyan4973.github.io/lz4/lz4_Frame_format.html">LZ4 Frame Format</a>
* Format Spec</a>
*/ */
public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
@ -49,6 +48,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
private final LZ4Compressor compressor; private final LZ4Compressor compressor;
private final XXHash32 checksum; private final XXHash32 checksum;
private final boolean useBrokenFlagDescriptorChecksum;
private final FLG flg; private final FLG flg;
private final BD bd; private final BD bd;
private final byte[] buffer; private final byte[] buffer;
@ -65,12 +65,15 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
* values will generate an exception * values will generate an exception
* @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
* every block of data * every block of data
* @param useBrokenFlagDescriptorChecksum Default: false. When true, writes an incorrect FrameDescriptor checksum
* compatible with older kafka clients.
* @throws IOException * @throws IOException
*/ */
public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException { public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException {
super(out); super(out);
compressor = LZ4Factory.fastestInstance().fastCompressor(); compressor = LZ4Factory.fastestInstance().fastCompressor();
checksum = XXHashFactory.fastestInstance().hash32(); checksum = XXHashFactory.fastestInstance().hash32();
this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
bd = new BD(blockSize); bd = new BD(blockSize);
flg = new FLG(blockChecksum); flg = new FLG(blockChecksum);
bufferOffset = 0; bufferOffset = 0;
@ -81,6 +84,20 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
writeHeader(); writeHeader();
} }
/**
* Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
*
* @param out The output stream to compress
* @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
* values will generate an exception
* @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
* every block of data
* @throws IOException
*/
public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
this(out, blockSize, blockChecksum, false);
}
/** /**
* Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
* *
@ -90,7 +107,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
* @throws IOException * @throws IOException
*/ */
public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException { public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
this(out, blockSize, false); this(out, blockSize, false, false);
} }
/** /**
@ -103,6 +120,19 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
this(out, BLOCKSIZE_64KB); this(out, BLOCKSIZE_64KB);
} }
public KafkaLZ4BlockOutputStream(OutputStream out, boolean useBrokenHC) throws IOException {
this(out, BLOCKSIZE_64KB, false, useBrokenHC);
}
/**
* Check whether KafkaLZ4BlockInputStream is configured to write an
* incorrect Frame Descriptor checksum, which is useful for
* compatibility with old client implementations.
*/
public boolean useBrokenFlagDescriptorChecksum() {
return this.useBrokenFlagDescriptorChecksum;
}
/** /**
* Writes the magic number and frame descriptor to the underlying {@link OutputStream}. * Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
* *
@ -114,10 +144,17 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
buffer[bufferOffset++] = flg.toByte(); buffer[bufferOffset++] = flg.toByte();
buffer[bufferOffset++] = bd.toByte(); buffer[bufferOffset++] = bd.toByte();
// TODO write uncompressed content size, update flg.validate() // TODO write uncompressed content size, update flg.validate()
// TODO write dictionary id, update flg.validate()
// compute checksum on all descriptor fields // compute checksum on all descriptor fields
int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF; int offset = 4;
buffer[bufferOffset++] = (byte) hash; int len = bufferOffset - offset;
if (this.useBrokenFlagDescriptorChecksum) {
len += offset;
offset = 0;
}
byte hash = (byte) ((checksum.hash(buffer, offset, len, 0) >> 8) & 0xFF);
buffer[bufferOffset++] = hash;
// write out frame descriptor // write out frame descriptor
out.write(buffer, 0, bufferOffset); out.write(buffer, 0, bufferOffset);
bufferOffset = 0; bufferOffset = 0;
@ -236,8 +273,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
private static final int VERSION = 1; private static final int VERSION = 1;
private final int presetDictionary; private final int reserved;
private final int reserved1;
private final int contentChecksum; private final int contentChecksum;
private final int contentSize; private final int contentSize;
private final int blockChecksum; private final int blockChecksum;
@ -249,18 +285,16 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
} }
public FLG(boolean blockChecksum) { public FLG(boolean blockChecksum) {
this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION); this(0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
} }
private FLG(int presetDictionary, private FLG(int reserved,
int reserved1,
int contentChecksum, int contentChecksum,
int contentSize, int contentSize,
int blockChecksum, int blockChecksum,
int blockIndependence, int blockIndependence,
int version) { int version) {
this.presetDictionary = presetDictionary; this.reserved = reserved;
this.reserved1 = reserved1;
this.contentChecksum = contentChecksum; this.contentChecksum = contentChecksum;
this.contentSize = contentSize; this.contentSize = contentSize;
this.blockChecksum = blockChecksum; this.blockChecksum = blockChecksum;
@ -270,16 +304,14 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
} }
public static FLG fromByte(byte flg) { public static FLG fromByte(byte flg) {
int presetDictionary = (flg >>> 0) & 1; int reserved = (flg >>> 0) & 3;
int reserved1 = (flg >>> 1) & 1;
int contentChecksum = (flg >>> 2) & 1; int contentChecksum = (flg >>> 2) & 1;
int contentSize = (flg >>> 3) & 1; int contentSize = (flg >>> 3) & 1;
int blockChecksum = (flg >>> 4) & 1; int blockChecksum = (flg >>> 4) & 1;
int blockIndependence = (flg >>> 5) & 1; int blockIndependence = (flg >>> 5) & 1;
int version = (flg >>> 6) & 3; int version = (flg >>> 6) & 3;
return new FLG(presetDictionary, return new FLG(reserved,
reserved1,
contentChecksum, contentChecksum,
contentSize, contentSize,
blockChecksum, blockChecksum,
@ -288,22 +320,13 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
} }
public byte toByte() { public byte toByte() {
return (byte) (((presetDictionary & 1) << 0) | ((reserved1 & 1) << 1) | ((contentChecksum & 1) << 2) return (byte) (((reserved & 3) << 0) | ((contentChecksum & 1) << 2)
| ((contentSize & 1) << 3) | ((blockChecksum & 1) << 4) | ((blockIndependence & 1) << 5) | ((version & 3) << 6)); | ((contentSize & 1) << 3) | ((blockChecksum & 1) << 4) | ((blockIndependence & 1) << 5) | ((version & 3) << 6));
} }
private void validate() { private void validate() {
if (presetDictionary != 0) { if (reserved != 0) {
throw new RuntimeException("Preset dictionary is unsupported"); throw new RuntimeException("Reserved bits must be 0");
}
if (reserved1 != 0) {
throw new RuntimeException("Reserved1 field must be 0");
}
if (contentChecksum != 0) {
throw new RuntimeException("Content checksum is unsupported");
}
if (contentSize != 0) {
throw new RuntimeException("Content size is unsupported");
} }
if (blockIndependence != 1) { if (blockIndependence != 1) {
throw new RuntimeException("Dependent block stream is unsupported"); throw new RuntimeException("Dependent block stream is unsupported");
@ -313,10 +336,6 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
} }
} }
public boolean isPresetDictionarySet() {
return presetDictionary == 1;
}
public boolean isContentChecksumSet() { public boolean isContentChecksumSet() {
return contentChecksum == 1; return contentChecksum == 1;
} }

View File

@ -187,10 +187,10 @@ public class MemoryRecords implements Records {
public Iterator<LogEntry> iterator() { public Iterator<LogEntry> iterator() {
if (writable) { if (writable) {
// flip on a duplicate buffer for reading // flip on a duplicate buffer for reading
return new RecordsIterator((ByteBuffer) this.buffer.duplicate().flip(), CompressionType.NONE, false); return new RecordsIterator((ByteBuffer) this.buffer.duplicate().flip(), false);
} else { } else {
// do not need to flip for non-writable buffer // do not need to flip for non-writable buffer
return new RecordsIterator(this.buffer.duplicate(), CompressionType.NONE, false); return new RecordsIterator(this.buffer.duplicate(), false);
} }
} }
@ -224,11 +224,11 @@ public class MemoryRecords implements Records {
private final ArrayDeque<LogEntry> logEntries; private final ArrayDeque<LogEntry> logEntries;
private final long absoluteBaseOffset; private final long absoluteBaseOffset;
public RecordsIterator(ByteBuffer buffer, CompressionType type, boolean shallow) { public RecordsIterator(ByteBuffer buffer, boolean shallow) {
this.type = type; this.type = CompressionType.NONE;
this.buffer = buffer; this.buffer = buffer;
this.shallow = shallow; this.shallow = shallow;
this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type); this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
this.logEntries = null; this.logEntries = null;
this.absoluteBaseOffset = -1; this.absoluteBaseOffset = -1;
} }
@ -238,7 +238,7 @@ public class MemoryRecords implements Records {
this.type = entry.record().compressionType(); this.type = entry.record().compressionType();
this.buffer = entry.record().value(); this.buffer = entry.record().value();
this.shallow = true; this.shallow = true;
this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type); this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
long wrapperRecordOffset = entry.offset(); long wrapperRecordOffset = entry.offset();
// If relative offset is used, we need to decompress the entire message first to compute // If relative offset is used, we need to decompress the entire message first to compute
// the absolute offset. // the absolute offset.

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import net.jpountz.xxhash.XXHashFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(value = Parameterized.class)
public class KafkaLZ4Test {
private final boolean useBrokenFlagDescriptorChecksum;
private final boolean ignoreFlagDescriptorChecksum;
private final byte[] payload;
public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum, byte[] payload) {
this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
this.payload = payload;
}
@Parameters
public static Collection<Object[]> data() {
byte[] payload = new byte[1000];
Arrays.fill(payload, (byte) 1);
List<Object[]> values = new ArrayList<Object[]>();
for (boolean broken : Arrays.asList(false, true))
for (boolean ignore : Arrays.asList(false, true))
values.add(new Object[] {broken, ignore, payload});
return values;
}
@Test
public void testKafkaLZ4() throws IOException {
ByteArrayOutputStream output = new ByteArrayOutputStream();
KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream(output, this.useBrokenFlagDescriptorChecksum);
lz4.write(this.payload, 0, this.payload.length);
lz4.flush();
byte[] compressed = output.toByteArray();
// Check magic bytes stored as little-endian
int offset = 0;
assertEquals(compressed[offset++], 0x04);
assertEquals(compressed[offset++], 0x22);
assertEquals(compressed[offset++], 0x4D);
assertEquals(compressed[offset++], 0x18);
// Check flg descriptor
byte flg = compressed[offset++];
// 2-bit version must be 01
int version = (flg >>> 6) & 3;
assertEquals(version, 1);
// Reserved bits should always be 0
int reserved = flg & 3;
assertEquals(reserved, 0);
// Check block descriptor
byte bd = compressed[offset++];
// Block max-size
int blockMaxSize = (bd >>> 4) & 7;
// Only supported values are 4 (64KB), 5 (256KB), 6 (1MB), 7 (4MB)
assertTrue(blockMaxSize >= 4);
assertTrue(blockMaxSize <= 7);
// Multiple reserved bit ranges in block descriptor
reserved = bd & 15;
assertEquals(reserved, 0);
reserved = (bd >>> 7) & 1;
assertEquals(reserved, 0);
// If flg descriptor sets content size flag
// there are 8 additional bytes before checksum
boolean contentSize = ((flg >>> 3) & 1) != 0;
if (contentSize)
offset += 8;
// Checksum applies to frame descriptor: flg, bd, and optional contentsize
// so initial offset should be 4 (for magic bytes)
int off = 4;
int len = offset - 4;
// Initial implementation of checksum incorrectly applied to full header
// including magic bytes
if (this.useBrokenFlagDescriptorChecksum) {
off = 0;
len = offset;
}
int hash = XXHashFactory.fastestInstance().hash32().hash(compressed, off, len, 0);
byte hc = compressed[offset++];
assertEquals(hc, (byte) ((hash >> 8) & 0xFF));
ByteArrayInputStream input = new ByteArrayInputStream(compressed);
try {
KafkaLZ4BlockInputStream decompressed = new KafkaLZ4BlockInputStream(input, this.ignoreFlagDescriptorChecksum);
byte[] testPayload = new byte[this.payload.length];
int ret = decompressed.read(testPayload, 0, this.payload.length);
assertEquals(ret, this.payload.length);
assertArrayEquals(this.payload, testPayload);
} catch (IOException e) {
assertTrue(this.useBrokenFlagDescriptorChecksum && !this.ignoreFlagDescriptorChecksum);
}
}
}

View File

@ -499,7 +499,7 @@ private[log] class Cleaner(val id: Int,
val timestampType = firstMessageOffset.message.timestampType val timestampType = firstMessageOffset.message.timestampType
val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = messageFormatVersion) { outputStream => messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = messageFormatVersion) { outputStream =>
val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) val output = new DataOutputStream(CompressionFactory(compressionCodec, messageFormatVersion, outputStream))
try { try {
for (messageOffset <- messageAndOffsets) { for (messageOffset <- messageAndOffsets) {
val message = messageOffset.message val message = messageOffset.message

View File

@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.errors.InvalidTimestampException import org.apache.kafka.common.errors.InvalidTimestampException
import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
@ -55,7 +56,7 @@ object ByteBufferMessageSet {
var offset = -1L var offset = -1L
val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream => messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
try { try {
for (message <- messages) { for (message <- messages) {
offset = offsetAssigner.nextAbsoluteOffset() offset = offsetAssigner.nextAbsoluteOffset()
@ -95,7 +96,7 @@ object ByteBufferMessageSet {
if (wrapperMessage.payload == null) if (wrapperMessage.payload == null)
throw new KafkaException(s"Message payload is null: $wrapperMessage") throw new KafkaException(s"Message payload is null: $wrapperMessage")
val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload) val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload)
val compressed = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream)) val compressed = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
var lastInnerOffset = -1L var lastInnerOffset = -1L
val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) { val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) {
@ -107,7 +108,7 @@ object ByteBufferMessageSet {
case eofe: EOFException => case eofe: EOFException =>
compressed.close() compressed.close()
case ioe: IOException => case ioe: IOException =>
throw new KafkaException(ioe) throw new CorruptRecordException(ioe)
} }
Some(innerMessageAndOffsets) Some(innerMessageAndOffsets)
} else None } else None

View File

@ -26,7 +26,7 @@ import org.apache.kafka.common.record.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOu
object CompressionFactory { object CompressionFactory {
def apply(compressionCodec: CompressionCodec, stream: OutputStream): OutputStream = { def apply(compressionCodec: CompressionCodec, messageVersion: Byte, stream: OutputStream): OutputStream = {
compressionCodec match { compressionCodec match {
case DefaultCompressionCodec => new GZIPOutputStream(stream) case DefaultCompressionCodec => new GZIPOutputStream(stream)
case GZIPCompressionCodec => new GZIPOutputStream(stream) case GZIPCompressionCodec => new GZIPOutputStream(stream)
@ -34,13 +34,13 @@ object CompressionFactory {
import org.xerial.snappy.SnappyOutputStream import org.xerial.snappy.SnappyOutputStream
new SnappyOutputStream(stream) new SnappyOutputStream(stream)
case LZ4CompressionCodec => case LZ4CompressionCodec =>
new KafkaLZ4BlockOutputStream(stream) new KafkaLZ4BlockOutputStream(stream, messageVersion == Message.MagicValue_V0)
case _ => case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
} }
} }
def apply(compressionCodec: CompressionCodec, stream: InputStream): InputStream = { def apply(compressionCodec: CompressionCodec, messageVersion: Byte, stream: InputStream): InputStream = {
compressionCodec match { compressionCodec match {
case DefaultCompressionCodec => new GZIPInputStream(stream) case DefaultCompressionCodec => new GZIPInputStream(stream)
case GZIPCompressionCodec => new GZIPInputStream(stream) case GZIPCompressionCodec => new GZIPInputStream(stream)
@ -48,7 +48,7 @@ object CompressionFactory {
import org.xerial.snappy.SnappyInputStream import org.xerial.snappy.SnappyInputStream
new SnappyInputStream(stream) new SnappyInputStream(stream)
case LZ4CompressionCodec => case LZ4CompressionCodec =>
new KafkaLZ4BlockInputStream(stream) new KafkaLZ4BlockInputStream(stream, messageVersion == Message.MagicValue_V0)
case _ => case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
} }

View File

@ -17,6 +17,9 @@
package kafka.message package kafka.message
import org.apache.kafka.common.record._
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import scala.collection._ import scala.collection._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
@ -25,6 +28,24 @@ import org.junit.Assert._
class MessageCompressionTest extends JUnitSuite { class MessageCompressionTest extends JUnitSuite {
@Test
def testLZ4FramingV0() {
val output = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, new ByteArrayOutputStream())
assertTrue(output.asInstanceOf[KafkaLZ4BlockOutputStream].useBrokenFlagDescriptorChecksum())
val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, new ByteArrayInputStream(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, 0x1A)))
assertTrue(input.asInstanceOf[KafkaLZ4BlockInputStream].ignoreFlagDescriptorChecksum())
}
@Test
def testLZ4FramingV1() {
val output = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, new ByteArrayOutputStream())
assertFalse(output.asInstanceOf[KafkaLZ4BlockOutputStream].useBrokenFlagDescriptorChecksum())
val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, new ByteArrayInputStream(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, -126)))
assertFalse(input.asInstanceOf[KafkaLZ4BlockInputStream].ignoreFlagDescriptorChecksum())
}
@Test @Test
def testSimpleCompressDecompress() { def testSimpleCompressDecompress() {
val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec) val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec)

View File

@ -36,7 +36,7 @@ class MessageWriterTest extends JUnitSuite {
private def mkMessageWithWriter(key: Array[Byte] = null, bytes: Array[Byte], codec: CompressionCodec): Message = { private def mkMessageWithWriter(key: Array[Byte] = null, bytes: Array[Byte], codec: CompressionCodec): Message = {
val writer = new MessageWriter(100) val writer = new MessageWriter(100)
writer.write(key = key, codec = codec, timestamp = Message.NoTimestamp, timestampType = TimestampType.CREATE_TIME, magicValue = Message.MagicValue_V1) { output => writer.write(key = key, codec = codec, timestamp = Message.NoTimestamp, timestampType = TimestampType.CREATE_TIME, magicValue = Message.MagicValue_V1) { output =>
val out = if (codec == NoCompressionCodec) output else CompressionFactory(codec, output) val out = if (codec == NoCompressionCodec) output else CompressionFactory(codec, Message.MagicValue_V1, output)
try { try {
val p = rnd.nextInt(bytes.length) val p = rnd.nextInt(bytes.length)
out.write(bytes, 0, p) out.write(bytes, 0, p)
@ -53,14 +53,14 @@ class MessageWriterTest extends JUnitSuite {
private def compress(bytes: Array[Byte], codec: CompressionCodec): Array[Byte] = { private def compress(bytes: Array[Byte], codec: CompressionCodec): Array[Byte] = {
val baos = new ByteArrayOutputStream() val baos = new ByteArrayOutputStream()
val out = CompressionFactory(codec, baos) val out = CompressionFactory(codec, Message.MagicValue_V1, baos)
out.write(bytes) out.write(bytes)
out.close() out.close()
baos.toByteArray baos.toByteArray
} }
private def decompress(compressed: Array[Byte], codec: CompressionCodec): Array[Byte] = { private def decompress(compressed: Array[Byte], codec: CompressionCodec): Array[Byte] = {
toArray(CompressionFactory(codec, new ByteArrayInputStream(compressed))) toArray(CompressionFactory(codec, Message.MagicValue_V1, new ByteArrayInputStream(compressed)))
} }
private def toArray(in: InputStream): Array[Byte] = { private def toArray(in: InputStream): Array[Byte] = {

View File

@ -80,6 +80,11 @@ work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9
<li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called. </li> <li> MirrorMakerMessageHandler no longer exposes the <code>handle(record: MessageAndMetadata[Array[Byte], Array[Byte]])</code> method as it was never called. </li>
<li> The 0.7 KafkaMigrationTool is no longer packaged with Kafka. If you need to migrate from 0.7 to 0.10.0, please migrate to 0.8 first and then follow the documented upgrade process to upgrade from 0.8 to 0.10.0. </li> <li> The 0.7 KafkaMigrationTool is no longer packaged with Kafka. If you need to migrate from 0.7 to 0.10.0, please migrate to 0.8 first and then follow the documented upgrade process to upgrade from 0.8 to 0.10.0. </li>
<li> The new consumer has standardized its APIs to accept <code>java.util.Collection</code> as the sequence type for method parameters. Existing code may have to be updated to work with the 0.10.0 client library. </li> <li> The new consumer has standardized its APIs to accept <code>java.util.Collection</code> as the sequence type for method parameters. Existing code may have to be updated to work with the 0.10.0 client library. </li>
<li> LZ4-compressed message handling was changed to use an interoperable framing specification (LZ4f v1.5.1).
To maintain compatibility with old clients, this change only applies to Message format 0.10.0 and later.
Clients that Produce/Fetch LZ4-compressed messages using v0/v1 (Message format 0.9.0) should continue
to use the 0.9.0 framing implementation. Clients that use Produce/Fetch protocols v2 or later
should use interoperable LZ4f framing. A list of interoperable LZ4 libraries is available at http://www.lz4.org/
</ul> </ul>
<h5><a id="upgrade_10_notable" href="#upgrade_10_notable">Notable changes in 0.10.0.0</a></h5> <h5><a id="upgrade_10_notable" href="#upgrade_10_notable">Notable changes in 0.10.0.0</a></h5>