mirror of https://github.com/apache/kafka.git
KAFKA-2421: Upgrade LZ4 to version 1.3
A few notes on the added test: * I verified this test fails when changing between snappy 1.1.1.2 and 1.1.1.7 (per KAFKA-2189) * The hard coded numbers are passing before and after lzo change Author: Grant Henke <granthenke@gmail.com> Reviewers: Ismael Juma, Guozhang Wang Closes #552 from granthenke/lz4
This commit is contained in:
parent
0a52ddfd03
commit
69269e76a4
|
@ -408,7 +408,7 @@ project(':clients') {
|
|||
dependencies {
|
||||
compile "$slf4japi"
|
||||
compile 'org.xerial.snappy:snappy-java:1.1.2'
|
||||
compile 'net.jpountz.lz4:lz4:1.2.0'
|
||||
compile 'net.jpountz.lz4:lz4:1.3'
|
||||
|
||||
testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52'
|
||||
testCompile "$junit"
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
|
@ -37,7 +37,7 @@ import net.jpountz.xxhash.XXHashFactory;
|
|||
|
||||
/**
|
||||
* A partial implementation of the v1.4.1 LZ4 Frame format.
|
||||
*
|
||||
*
|
||||
* @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
|
||||
* Format Spec</a>
|
||||
*/
|
||||
|
@ -61,7 +61,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
|
|||
|
||||
/**
|
||||
* Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
|
||||
*
|
||||
*
|
||||
* @param in The stream to decompress
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -80,7 +80,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
|
|||
|
||||
/**
|
||||
* Reads the magic number and frame descriptor from the underlying {@link InputStream}.
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void readHeader() throws IOException {
|
||||
|
@ -111,7 +111,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
|
|||
/**
|
||||
* Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum, and writes the
|
||||
* result to a buffer.
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void readBlock() throws IOException {
|
||||
|
@ -174,7 +174,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
|
|||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
net.jpountz.util.Utils.checkRange(b, off, len);
|
||||
net.jpountz.util.SafeUtils.checkRange(b, off, len);
|
||||
if (finished) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
|
@ -30,7 +30,7 @@ import net.jpountz.xxhash.XXHashFactory;
|
|||
|
||||
/**
|
||||
* A partial implementation of the v1.4.1 LZ4 Frame format.
|
||||
*
|
||||
*
|
||||
* @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
|
||||
* Format Spec</a>
|
||||
*/
|
||||
|
@ -59,7 +59,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
|
|||
|
||||
/**
|
||||
* Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
|
||||
*
|
||||
*
|
||||
* @param out The output stream to compress
|
||||
* @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
|
||||
* values will generate an exception
|
||||
|
@ -83,7 +83,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
|
|||
|
||||
/**
|
||||
* Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
|
||||
*
|
||||
*
|
||||
* @param out The stream to compress
|
||||
* @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
|
||||
* values will generate an exception
|
||||
|
@ -95,7 +95,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
|
|||
|
||||
/**
|
||||
* Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
|
||||
*
|
||||
*
|
||||
* @param out The output stream to compress
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -105,7 +105,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
|
|||
|
||||
/**
|
||||
* Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void writeHeader() throws IOException {
|
||||
|
@ -126,7 +126,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
|
|||
/**
|
||||
* Compresses buffered data, optionally computes an XXHash32 checksum, and writes the result to the underlying
|
||||
* {@link OutputStream}.
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void writeBlock() throws IOException {
|
||||
|
@ -160,7 +160,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
|
|||
/**
|
||||
* Similar to the {@link #writeBlock()} method. Writes a 0-length block (without block checksum) to signal the end
|
||||
* of the block stream.
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void writeEndMark() throws IOException {
|
||||
|
@ -180,7 +180,7 @@ public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
|
|||
|
||||
@Override
|
||||
public void write(byte[] b, int off, int len) throws IOException {
|
||||
net.jpountz.util.Utils.checkRange(b, off, len);
|
||||
net.jpountz.util.SafeUtils.checkRange(b, off, len);
|
||||
ensureNotFinished();
|
||||
|
||||
int bufferRemainingLength = maxBlockSize - bufferOffset;
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.junit._
|
|||
import org.junit.Assert._
|
||||
|
||||
class MessageCompressionTest extends JUnitSuite {
|
||||
|
||||
|
||||
@Test
|
||||
def testSimpleCompressDecompress() {
|
||||
val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec)
|
||||
|
@ -35,19 +35,40 @@ class MessageCompressionTest extends JUnitSuite {
|
|||
for(codec <- codecs)
|
||||
testSimpleCompressDecompress(codec)
|
||||
}
|
||||
|
||||
|
||||
// A quick test to ensure any growth or increase in compression size is known when upgrading libraries
|
||||
@Test
|
||||
def testCompressSize() {
|
||||
val bytes1k: Array[Byte] = (0 until 1000).map(_.toByte).toArray
|
||||
val bytes2k: Array[Byte] = (1000 until 2000).map(_.toByte).toArray
|
||||
val bytes3k: Array[Byte] = (3000 until 4000).map(_.toByte).toArray
|
||||
val messages: List[Message] = List(new Message(bytes1k), new Message(bytes2k), new Message(bytes3k))
|
||||
|
||||
testCompressSize(GZIPCompressionCodec, messages, 388)
|
||||
|
||||
if(isSnappyAvailable)
|
||||
testCompressSize(SnappyCompressionCodec, messages, 491)
|
||||
|
||||
if(isLZ4Available)
|
||||
testCompressSize(LZ4CompressionCodec, messages, 380)
|
||||
}
|
||||
|
||||
def testSimpleCompressDecompress(compressionCodec: CompressionCodec) {
|
||||
val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes))
|
||||
val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = messages:_*)
|
||||
assertEquals(compressionCodec, messageSet.shallowIterator.next.message.compressionCodec)
|
||||
assertEquals(compressionCodec, messageSet.shallowIterator.next().message.compressionCodec)
|
||||
val decompressed = messageSet.iterator.map(_.message).toList
|
||||
assertEquals(messages, decompressed)
|
||||
}
|
||||
|
||||
|
||||
def isSnappyAvailable(): Boolean = {
|
||||
def testCompressSize(compressionCodec: CompressionCodec, messages: List[Message], expectedSize: Int) {
|
||||
val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = messages:_*)
|
||||
assertEquals(s"$compressionCodec size has changed.", expectedSize, messageSet.sizeInBytes)
|
||||
}
|
||||
|
||||
def isSnappyAvailable: Boolean = {
|
||||
try {
|
||||
val snappy = new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream())
|
||||
new org.xerial.snappy.SnappyOutputStream(new ByteArrayOutputStream())
|
||||
true
|
||||
} catch {
|
||||
case e: UnsatisfiedLinkError => false
|
||||
|
@ -55,9 +76,9 @@ class MessageCompressionTest extends JUnitSuite {
|
|||
}
|
||||
}
|
||||
|
||||
def isLZ4Available(): Boolean = {
|
||||
def isLZ4Available: Boolean = {
|
||||
try {
|
||||
val lz4 = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream())
|
||||
new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream())
|
||||
true
|
||||
} catch {
|
||||
case e: UnsatisfiedLinkError => false
|
||||
|
|
Loading…
Reference in New Issue